Skip to content

Commit 27de1cb

Browse files
authored
Merge pull request #11 from clue-labs/all
Add all() helper to await successful fulfillment of all operations
2 parents 63778f8 + 034d1b6 commit 27de1cb

5 files changed

Lines changed: 561 additions & 1 deletion

File tree

README.md

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ much any API that already uses Promises.
4444
* [Promises](#promises)
4545
* [Timeout](#timeout)
4646
* [Streaming](#streaming)
47+
* [all()](#all)
4748
* [Install](#install)
4849
* [Tests](#tests)
4950
* [License](#license)
@@ -425,6 +426,112 @@ $uploader = new Transformer(10, function ($data) use ($http) {
425426
});
426427
```
427428

429+
#### all()
430+
431+
The static `all(ReadableStreamInterface $input, int $concurrency, callable $handler): PromiseInterface<int,Exception>` method can be used to
432+
concurrently process all jobs from the input stream through the given `$handler`.
433+
434+
This is a convenience method which uses the `Transformer` internally to
435+
schedule all jobs from the input stream while limiting concurrency to
436+
ensure no more than `$concurrency` jobs ever run at once. It will return
437+
a promise which resolves with the total number of all successful jobs
438+
on success.
439+
440+
```php
441+
$loop = React\EventLoop\Factory::create();
442+
$browser = new Clue\React\Buzz\Browser($loop);
443+
444+
$promise = Transformer::all($input, 3, function ($data) use ($browser, $url) {
445+
return $browser->post($url, [], json_encode($data));
446+
});
447+
448+
$promise->then(function ($count) {
449+
echo 'All ' . $count . ' jobs successful!' . PHP_EOL;
450+
});
451+
```
452+
453+
If either of the jobs fail, it will reject the resulting promise, will
454+
`close()` the input stream and will try to cancel all outstanding jobs.
455+
Calling `cancel()` on the pending promise will `close()` the input stream
456+
and will try to cancel all outstanding jobs. Similarly, if the `$input`
457+
stream emits an `error` event, it will reject the resulting promise and
458+
will try to cancel all outstanding jobs.
459+
460+
The `$input` parameter must be a `ReadableStreamInterface` which emits
461+
one `data` event for each job to process. Each element will be passed to
462+
the `$handler` to start one job. The fulfillment value for each job will
463+
be ignored, so for best performance it's recommended to not return any
464+
excessive data structures. When the stream emits an `end` or `close`
465+
event, this method will wait for all outstanding jobs to complete and
466+
then resolve with the number of successful jobs. If this stream is
467+
already closed or does not emit any `data` events, this method will
468+
resolve with a `0` value without processing any jobs.
469+
470+
```php
471+
$input = new ThroughStream();
472+
473+
$promise = Transformer::all($input, 2, $handler);
474+
475+
$input->write('a');
476+
$input->write('b');
477+
$input->write('c');
478+
$input->end();
479+
```
480+
481+
Because streams are one of the core abstractions of ReactPHP, a large number
482+
of stream implementations are available for many different use cases. For
483+
example, this allows you to use [clue/reactphp-ndjson](https://github.com/clue/reactphp-ndjson)
484+
or [clue/reactphp-csv](https://github.com/clue/reactphp-csv) to process
485+
large lists of structured input data. See also [streaming](#streaming) for
486+
more details.
487+
488+
The `$concurrency` parameter sets a new soft limit for the maximum number
489+
of jobs to handle concurrently. Finding a good concurrency limit depends
490+
on your particular use case. It's common to limit concurrency to a rather
491+
small value, as doing more than a dozen of things at once may easily
492+
overwhelm the receiving side. Using a `1` value will ensure that all jobs
493+
are processed one after another, effectively creating a "waterfall" of
494+
jobs. Using a value less than 1 will reject with an
495+
`InvalidArgumentException` without processing any jobs.
496+
497+
```php
498+
// handle up to 10 jobs concurrently
499+
$promise = Transformer::all($stream, 10, $handler);
500+
```
501+
502+
```php
503+
// handle each job after another without concurrency (waterfall)
504+
$promise = Transformer::all($stream, 1, $handler);
505+
```
506+
507+
The `$handler` parameter must be a valid callable that accepts your job
508+
parameter (the data from the `$input` stream), invokes the appropriate
509+
operation and returns a Promise as a placeholder for its future result.
510+
The fulfillment value for each job will be ignored, so for best
511+
performance it's recommended to not return any excessive data structures.
512+
If the given argument is not a valid callable, this method will reject
513+
with an `InvalidArgumentExceptionn` without processing any jobs.
514+
515+
```php
516+
// using a Closure as handler is usually recommended
517+
$promise = Transformer::all($stream, 10, function ($url) use ($browser) {
518+
return $browser->get($url);
519+
});
520+
```
521+
522+
```php
523+
// accepts any callable, so PHP's array notation is also supported
524+
$promise = Transformer::all($stream, 10, array($browser, 'get'));
525+
```
526+
527+
Note that this method returns a promise that resolves with the total
528+
number of successful operations only if all operations succeed. This
529+
is mostly a convenience method that uses the [`Transformer`](#transformer)
530+
under the hood. If your input data is small enough to fit into memory
531+
(a few dozens or hundreds of operations), you may want to use
532+
[clue/reactphp-mq](https://github.com/clue/reactphp-mq) instead and keep
533+
all operations in memory without using a streaming approach.
534+
428535
## Install
429536

430537
The recommended way to install this library is [through Composer](https://getcomposer.org).
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,4 +53,4 @@ function (ResponseInterface $response) use ($user) {
5353
});
5454
$transformer->on('error', 'printf');
5555

56-
$loop->run();
56+
$loop->run();

examples/02-transform-all.php

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
<?php
2+
3+
use Clue\React\Flux\Transformer;
4+
use Psr\Http\Message\ResponseInterface;
5+
6+
require __DIR__ . '/../vendor/autoload.php';
7+
8+
$loop = React\EventLoop\Factory::create();
9+
$browser = new Clue\React\Buzz\Browser($loop);
10+
11+
$concurrency = isset($argv[1]) ? $argv[1] : 3;
12+
$url = isset($argv[2]) ? $argv[2] : 'http://httpbin.org/post';
13+
14+
// load a huge number of users to process from NDJSON file
15+
$input = new Clue\React\NDJson\Decoder(
16+
new React\Stream\ReadableResourceStream(
17+
fopen(__DIR__ . '/users.ndjson', 'r'),
18+
$loop
19+
),
20+
true
21+
);
22+
23+
// each job should use the browser to POST each user object to a certain URL
24+
// process all users by processing all users through transformer
25+
// limit number of concurrent jobs here
26+
$promise = Transformer::all($input, $concurrency, function ($user) use ($browser, $url) {
27+
return $browser->post(
28+
$url,
29+
array('Content-Type' => 'application/json'),
30+
json_encode($user)
31+
)->then(function (ResponseInterface $response) {
32+
// demo HTTP response validation
33+
$body = json_decode($response->getBody());
34+
if (!isset($body->json)) {
35+
throw new RuntimeException('Unexpected response');
36+
}
37+
});
38+
});
39+
40+
$promise->then(
41+
function ($count) {
42+
echo 'Successfully processed all ' . $count . ' user records' . PHP_EOL;
43+
},
44+
function (Exception $e) {
45+
echo 'An error occured: ' . $e->getMessage() . PHP_EOL;
46+
if ($e->getPrevious()) {
47+
echo 'Previous: ' . $e->getPrevious()->getMessage() . PHP_EOL;
48+
}
49+
}
50+
);
51+
52+
$loop->run();

src/Transformer.php

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@
88
use React\Stream\DuplexStreamInterface;
99
use React\Stream\Util;
1010
use React\Stream\WritableStreamInterface;
11+
use React\Stream\ReadableStreamInterface;
12+
use React\Promise;
13+
use React\Promise\Deferred;
14+
use React\Promise\PromiseInterface;
1115

1216
/**
1317
* The `Transformer` passes all input data through its transformation handler
@@ -296,6 +300,159 @@ final class Transformer extends EventEmitter implements DuplexStreamInterface
296300
private $promises = array();
297301
private $queued = array();
298302

303+
/**
304+
* Concurrently process all jobs from the input stream through the given `$handler`.
305+
*
306+
* This is a convenience method which uses the `Transformer` internally to
307+
* schedule all jobs from the input stream while limiting concurrency to
308+
* ensure no more than `$concurrency` jobs ever run at once. It will return
309+
* a promise which resolves with the total number of all successful jobs
310+
* on success.
311+
*
312+
* ```php
313+
* $loop = React\EventLoop\Factory::create();
314+
* $browser = new Clue\React\Buzz\Browser($loop);
315+
*
316+
* $promise = Transformer::all($input, 3, function ($data) use ($browser, $url) {
317+
* return $browser->post($url, [], json_encode($data));
318+
* });
319+
*
320+
* $promise->then(function ($count) {
321+
* echo 'All ' . $count . ' jobs successful!' . PHP_EOL;
322+
* });
323+
* ```
324+
*
325+
* If either of the jobs fail, it will reject the resulting promise, will
326+
* `close()` the input stream and will try to cancel all outstanding jobs.
327+
* Calling `cancel()` on the pending promise will `close()` the input stream
328+
* and will try to cancel all outstanding jobs. Similarly, if the `$input`
329+
* stream emits an `error` event, it will reject the resulting promise and
330+
* will try to cancel all outstanding jobs.
331+
*
332+
* The `$input` parameter must be a `ReadableStreamInterface` which emits
333+
* one `data` event for each job to process. Each element will be passed to
334+
* the `$handler` to start one job. The fulfillment value for each job will
335+
* be ignored, so for best performance it's recommended to not return any
336+
* excessive data structures. When the stream emits an `end` or `close`
337+
* event, this method will wait for all outstanding jobs to complete and
338+
* then resolve with the number of successful jobs. If this stream is
339+
* already closed or does not emit any `data` events, this method will
340+
* resolve with a `0` value without processing any jobs.
341+
*
342+
* ```php
343+
* $input = new ThroughStream();
344+
*
345+
* $promise = Transformer::all($input, 2, $handler);
346+
*
347+
* $input->write('a');
348+
* $input->write('b');
349+
* $input->write('c');
350+
* $input->end();
351+
* ```
352+
*
353+
* Because streams are one of the core abstractions of ReactPHP, a large number
354+
* of stream implementations are available for many different use cases. For
355+
* example, this allows you to use [clue/reactphp-ndjson](https://github.com/clue/reactphp-ndjson)
356+
* or [clue/reactphp-csv](https://github.com/clue/reactphp-csv) to process
357+
* large lists of structured input data. See also [streaming](#streaming) for
358+
* more details.
359+
*
360+
* The `$concurrency` parameter sets a new soft limit for the maximum number
361+
* of jobs to handle concurrently. Finding a good concurrency limit depends
362+
* on your particular use case. It's common to limit concurrency to a rather
363+
* small value, as doing more than a dozen of things at once may easily
364+
* overwhelm the receiving side. Using a `1` value will ensure that all jobs
365+
* are processed one after another, effectively creating a "waterfall" of
366+
* jobs. Using a value less than 1 will reject with an
367+
* `InvalidArgumentException` without processing any jobs.
368+
*
369+
* ```php
370+
* // handle up to 10 jobs concurrently
371+
* $promise = Transformer::all($stream, 10, $handler);
372+
* ```
373+
*
374+
* ```php
375+
* // handle each job after another without concurrency (waterfall)
376+
* $promise = Transformer::all($stream, 1, $handler);
377+
* ```
378+
*
379+
* The `$handler` parameter must be a valid callable that accepts your job
380+
* parameter (the data from the `$input` stream), invokes the appropriate
381+
* operation and returns a Promise as a placeholder for its future result.
382+
* The fulfillment value for each job will be ignored, so for best
383+
* performance it's recommended to not return any excessive data structures.
384+
* If the given argument is not a valid callable, this method will reject
385+
* with an `InvalidArgumentExceptionn` without processing any jobs.
386+
*
387+
* ```php
388+
* // using a Closure as handler is usually recommended
389+
* $promise = Transformer::all($stream, 10, function ($url) use ($browser) {
390+
* return $browser->get($url);
391+
* });
392+
* ```
393+
*
394+
* ```php
395+
* // accepts any callable, so PHP's array notation is also supported
396+
* $promise = Transformer::all($stream, 10, array($browser, 'get'));
397+
* ```
398+
*
399+
* Note that this method returns a promise that resolves with the total
400+
* number of successful operations only if all operations succeed. This
401+
* is mostly a convenience method that uses the [`Transformer`](#transformer)
402+
* under the hood. If your input data is small enough to fit into memory
403+
* (a few dozens or hundreds of operations), you may want to use
404+
* [clue/reactphp-mq](https://github.com/clue/reactphp-mq) instead and keep
405+
* all operations in memory without using a streaming approach.
406+
*
407+
* @param ReadableStreamInterface $input
408+
* @param int $concurrency
409+
* @param callable $callback
410+
* @return PromiseInterface Returns a Promise<int,Exception>
411+
*/
412+
public static function all(ReadableStreamInterface $input, $concurrency, $callback)
413+
{
414+
if (!$input->isReadable()) {
415+
return Promise\resolve(0);
416+
}
417+
418+
try {
419+
$stream = new self($concurrency, $callback);
420+
} catch (\InvalidArgumentException $e) {
421+
return Promise\reject($e);
422+
}
423+
424+
$deferred = new Deferred(function ($_, $reject) use ($input, $stream) {
425+
$reject(new \RuntimeException('Transformer cancelled'));
426+
$input->close();
427+
$stream->close();
428+
});
429+
430+
// forward input data through transformer until input stream ends/closes
431+
$input->pipe($stream);
432+
$input->on('close', array($stream, 'end'));
433+
434+
// count number of successful transformations and resolve with count on end
435+
$count = 0;
436+
$stream->on('data', function () use (&$count) {
437+
++$count;
438+
});
439+
$stream->on('end', function () use (&$count, $deferred) {
440+
$deferred->resolve($count);
441+
});
442+
443+
// input error or transformation error should reject result
444+
$input->on('error', function ($error) use ($deferred, $stream) {
445+
$deferred->reject($error);
446+
$stream->close();
447+
});
448+
$stream->on('error', function ($error) use ($deferred, $input) {
449+
$deferred->reject($error);
450+
$input->close();
451+
});
452+
453+
return $deferred->promise();
454+
}
455+
299456
/**
300457
* Instantiates a new Transformer instance.
301458
*

0 commit comments

Comments
 (0)