Skip to content

Latest commit

 

History

History
311 lines (229 loc) · 9.2 KB

File metadata and controls

311 lines (229 loc) · 9.2 KB

Plum

Plum is a data processing pipeline that helps you to write structured, reusable and well tested data processing code.


Index Workflow Readers Writers Filters Converters Extensions


Workflow

The workflow is the central element in the data processing pipeline provided by Plum. The workflow is represented by the class Plum\Plum\Workflow and you attach filters, converters and writers to it and, when ready, process it by passing one or more readers to it. The order in which filters, converters and writers can be attached is arbitrary and the type of the items returned by readers does not matter to Plum.

Table of Contents

Adding Converters, Filters, and Writers

The add*() methods offer two ways of adding converters, filters, and writers to the workflow. Either you pass the object as its first argument or you provide an array with options.

Converters

$converter = ...; // Instance of ConverterInterface or function
$workflow->addConverter($converter);
$workflow->addConverter([
    'converter' => $converter,
    'position'  => Workflow::APPEND,
]);

Learn more about adding converters.

Value Converters

$converter = ...; // Instance of ConverterInterface or function
$workflow->addConverter([
    'field'     => 'key',
    'converter' => $converter,
]);

Learn more about value converters.

Conditional Converters

$converter = ...; // Instance of ConverterInterface or function
$filter    = ...; // Instance of FilterInterface or function

$workflow->addConverter([
    'converter' => $converter,
    'filter'    => $filter,
]);
$workflow->addConverter([
    'converter'   => $converter,
    'filter'      => $filter,
    'filterField' => 'key'
]);

Learn more about conditional converters.

Filters

$filter = ...; // Instance of FilterInterface or function
$workflow->addFilter($filter);
$workflow->addFilter(['filter' => $filter]);

Learn more about adding filters.

Value Filters

$filter = ...; // Instance of FilterInterface or function
$workflow->addFilter([
    'field'  => 'key',
    'filter' => $filter,
));

Learn more about value filters.

Writers

$writer = ...; // Instance of WriterInterface
$workflow->addWriter($writer);
$workflow->addWriter(['writer' => $writer]);

Learn more about adding writers.

Conditional Writers

$writer = ...; // Instance of WriterInterface
$filter = ...; // Instance of FilterInterface or function
$workflow->addWriter([
    'writer' => $writer,
    'filter'  => $filter,
]);

Learn more about conditional writers.

Retrieving Converters, Filters and Writers

Workflow provides you with getters to retrieve elements from the pipeline.

$workflow->getPipeline(); // -> Plum\Plum\Pipe\Pipe[]

In addition there are methods to retrieve elements of a specific type, i.e., filters, converters and writers.

$workflow->getFilters(); // -> Plum\Plum\Pipe\FilterPipe[]
$workflow->getConverters(); // -> Plum\Plum\Pipe\ConverterPipe[]
$workflow->getWriters(); // -> Plum\Plum\Pipe\WriterPipe[]

Pipeline Order

The pipeline is processed strictly in the order the filters, converters and writers are added to the workflow. You can pass the position of an pipeline element in the array to the corresponding add*() method. There are two possible values:

  • Workflow::PREPEND
  • Workflow::APPEND
$workflow->addFilter(['filter' => $filter, 'position' => Workflow::PREPEND]);
$workflow->addConverter(['converter' => $converter, 'position' => Workflow::PREPEND]);
$workflow->addWriter(['converter' => $converter, 'position' => Workflow::APPEND]);

Callback Converters and Filters

Plum\Plum\Converter\CallbackConverter is a converter that executes a callback to convert an item. When adding a converter or value converter to a Workflow you can just pass the callback and the workflow will automatically create a CallbackConverter. This works for both the direct as well as the array syntax.

$workflow->addConverter(function ($item) { return strtoupper($item); });
$workflow->addConverter([
    'converter' => function ($item) { return strtoupper($item); }
]);
$workflow->addConverter([
    'field'     => 'foo', 
    'converter' => function ($item) { return strtoupper($item); }
]);

Plum\Plum\Filter\CallbackFilter is a filter that executes a callback and filters the item based on the return value of the callback. Just like with converters you can just pass the callback and the workflow will create a CallbackConverter for you.

$workflow->addFilter(function ($item) { return !empty($item['price']; });
$workflow->addFilter([
    'filter' => function ($item) { return !empty($item['price']; }
]);
$workflow->addFilter([
    'field'  => 'foo', 
    'filter' => function ($item) { return $item === 'bar'; }
]);

In addition it is also possible to use callbacks as filters in conditional converters.

Errors and Exceptions

By default if a reader, filter, converter or writer throws an error or exception the exception will not be handled by Plum\Plum\Workflow and therefore processing will stop. However, by setting the option resumeOnError to true Plum will catch all exceptions and continue processing items. The exceptions are stored in the Result object returned by the process() method and can be retrieved through the getExceptions() method. The number of items that produced an exception is returned by the getErrorCount() method.

use Plum\Plum\Workflow;

$workflow = new Workflow(['resumeOnError' => true]);
// Build workflow
$result = $workflow->process($reader);
$result->getErrorCount(); // -> int
$result->getExceptions(); // -> \Exception[]

Result

The process() method returns an instance of Plum\Plum\Result. This object contains information and errors collected during the processing.

$result = $workflow->process($reader);
$result->getReadCount(); // -> int
$result->getWriteCount(); // -> int
$result->getItemWriteCount(); // -> int
$result->getErrorCount(); // -> int
$result->getExceptions(); // -> \Exception[]

Plum counts two different types of writes. The write counter returned by getWriteCount() is increased every time an item is written. If you have 3 items and 2 writers in your workflow the write counter will be 6. In constrast the item write counter returned by getItemWriteCount() is only increased once for every item. That is, if you have 3 items and 2 writers in your workflow, the item writer counter will return 3.

Concatenating Workflows

On of the most powerful features of Plum is the ability to concatenate workflows. The Plum\Plum\WorkflowConcatenator implements both the Plum\Plum\ReaderInterface and the Plum\Plum\WriterInterface and must be added as a writer to the first workflow and as a reader to the second workflow.

use Plum\Plum\WorkflowConcatenator;

$concatenator = new WorkflowConcatenator();

// Add concatenator as writer to first workflow and process it.
$workflow1->addWriter($concatenator);
$workflow1->process($reader);

// Process the second workflow with the concatenator as reader.
$workflow->process($concatenator):

Merging Data

You can process data from multiple readers with one call to process(). In practice you can use this to merge multiple data sources into a single target.

use Plum\Plum\Workflow;

$workflow = new Workflow();
$workflow->process([$reader1, $reader2]);

Splitting Data

Plum also allows you to split data from one source into multiple targets. You can leverage the power of conditional writers to achieve this.

In the following example $filter2 should be the negation of $filter1.

$workflow->addWriter(['writer' => $writer1, 'filter' => $filter1]);
$workflow->addWriter(['writer' => $writer2, 'filter' => $filter2]);

Index Workflow Readers Writers Filters Converters Extensions