Immutable, composable processor pipelines for the KaririCode Framework —
context-based registry, flexible spec format, structured error collection, PHP 8.4+.
Installation · Quick Start · Features · Pipeline · Architecture
Building reusable data-processing chains in PHP typically means either rigid class hierarchies or ad-hoc chains of function calls that are hard to test, configure, and compose:
// The old way: ad-hoc chain, hard to test or reuse
function processInput(string $input): string
{
$input = trim($input);
$input = strtolower($input);
if (strlen($input) < 3) {
throw new \InvalidArgumentException('Too short');
}
return $input;
}No registry, no configuration per processor, no error collection, no immutability — just imperative code you copy-paste everywhere.
use KaririCode\ProcessorPipeline\ProcessorRegistry;
use KaririCode\ProcessorPipeline\ProcessorBuilder;
// 1. Register processors once, per context
$registry = new ProcessorRegistry();
$registry
->register('sanitizer', 'trim', new TrimProcessor())
->register('sanitizer', 'lowercase', new LowercaseProcessor())
->register('validator', 'length', new LengthValidator());
// 2. Build immutable pipelines from specs
$builder = new ProcessorBuilder($registry);
$sanitized = $builder->buildPipeline('sanitizer', ['trim', 'lowercase']);
$validated = $builder->buildPipeline('validator', [
'length' => ['minLength' => 3, 'maxLength' => 50],
]);
// 3. Execute — pipelines are immutable and reusable
$output = $sanitized->process(' HELLO WORLD '); // 'hello world'
$validated->process($output);| Requirement | Version |
|---|---|
| PHP | 8.4 or higher |
| kariricode/contract | ^2.8 |
| kariricode/exception | ^1.2 |
composer require kariricode/processor-pipelineDefine processors, register them, build a pipeline, execute:
<?php
declare(strict_types=1);
require_once __DIR__ . '/vendor/autoload.php';
use KaririCode\Contract\Processor\Processor;
use KaririCode\Contract\Processor\ConfigurableProcessor;
use KaririCode\ProcessorPipeline\ProcessorRegistry;
use KaririCode\ProcessorPipeline\ProcessorBuilder;
// 1. Define processors
final class TrimProcessor implements Processor
{
public function process(mixed $input): mixed
{
return is_string($input) ? trim($input) : $input;
}
}
final class LengthValidator implements ConfigurableProcessor
{
private int $min = 0;
private int $max = PHP_INT_MAX;
public function configure(array $options): void
{
$this->min = $options['minLength'] ?? $this->min;
$this->max = $options['maxLength'] ?? $this->max;
}
public function process(mixed $input): mixed
{
$len = mb_strlen((string) $input);
if ($len < $this->min || $len > $this->max) {
throw new \LengthException("Length must be between {$this->min} and {$this->max}.");
}
return $input;
}
}
// 2. Register and build
$registry = new ProcessorRegistry();
$registry
->register('sanitizer', 'trim', new TrimProcessor())
->register('validator', 'length', new LengthValidator());
$builder = new ProcessorBuilder($registry);
$pipeline = $builder->buildPipeline('sanitizer', ['trim']);
$validate = $builder->buildPipeline('validator', [
'length' => ['minLength' => 3, 'maxLength' => 50],
]);
// 3. Execute
$sanitized = $pipeline->process(' Hello, World! '); // 'Hello, World!'
$validate->process($sanitized); // passes — 13 chars
var_dump($sanitized); // string(13) "Hello, World!"Pipeline is a readonly class. Adding processors returns a new instance — the original is never modified:
$base = $builder->buildPipeline('sanitizer', ['trim']);
$extended = $base->withProcessor(new LowercaseProcessor());
// $base still has 1 processor
// $extended has 2 processors
assert($base->count() === 1);
assert($extended->count() === 2);Processors are namespaced by context — no name collisions across domains:
$registry->register('validator', 'email', new EmailValidator());
$registry->register('sanitizer', 'email', new EmailSanitizer()); // same name, different context
$validationPipeline = $builder->buildPipeline('validator', ['email']);
$sanitizationPipeline = $builder->buildPipeline('sanitizer', ['email']);Build pipelines with simple lists, enable/disable flags, or per-processor configuration:
$pipeline = $builder->buildPipeline('validator', [
'required', // Simple: always enabled
'trim' => true, // Explicit: enabled
'strict' => false, // Explicit: disabled (skipped)
'length' => ['minLength' => 3, 'maxLength' => 50], // Configured
]);ConfigurableProcessor allows per-build configuration without constructing new instances:
final class SlugProcessor implements ConfigurableProcessor
{
private string $separator = '-';
public function configure(array $options): void
{
$this->separator = $options['separator'] ?? '-';
}
public function process(mixed $input): mixed
{
return strtolower(str_replace(' ', $this->separator, trim((string) $input)));
}
}
$pipeline = $builder->buildPipeline('formatter', [
'slug' => ['separator' => '_'],
]);
$pipeline->process('Hello World'); // 'hello_world'Wrap processors for non-halting error collection — useful in validation scenarios:
use KaririCode\ProcessorPipeline\Handler\ProcessorHandler;
use KaririCode\ProcessorPipeline\Result\ProcessingResultCollection;
$results = new ProcessingResultCollection();
$handler = new ProcessorHandler(
processor: new EmailValidator(),
resultCollection: $results,
haltOnError: false, // continue on failure
);
$output = $handler->process('not-an-email'); // returns input unchanged
if ($results->hasErrors()) {
foreach ($results->getErrors() as $processor => $errors) {
foreach ($errors as $error) {
echo "{$processor}: {$error['message']}\n";
}
}
}Use #[Process] to declare pipelines declaratively on entity properties:
use KaririCode\ProcessorPipeline\Attribute\Process;
final class UserProfile
{
#[Process(
processors: ['trim', 'lowercase'],
messages: [],
)]
public private(set) string $email = '';
#[Process(
processors: ['required', 'length' => ['minLength' => 3]],
messages: ['required' => 'Username is required.'],
)]
public private(set) string $username = '';
}All exceptions carry a context array for structured logging and tracing:
use KaririCode\ProcessorPipeline\Exception\PipelineExecutionException;
try {
$pipeline->process($input);
} catch (PipelineExecutionException $e) {
// $e->context['stage'] — stage index where failure occurred
// $e->context['processorName'] — FQCN of the failing processor
// $e->getPrevious() — original exception
}ProcessorBuilder::buildPipeline($context, $specs)
│
▼
foreach spec entry:
resolveSpec($key, $value)
├── string key → name only (enabled, default config)
├── name => true → enabled, no config
├── name => false → SKIP
└── name => [..] → enabled + configure()
│
▼
ProcessorRegistry::get($context, $name)
ConfigurableProcessor::configure($options) ← if applicable
$processors[] = $processor
│
▼
new Pipeline($processors) ← immutable, readonly
Pipeline::process($input)
│
▼
foreach processor as $index => $p:
try:
$state = $p->process($state)
catch \Throwable:
throw PipelineExecutionException::atStage($p::class, $index, $cause)
│
▼
return $state
src/
├── Attribute/
│ └── Process.php PHP 8.4 attribute for declarative pipelines
├── Exception/
│ ├── PipelineExecutionException.php Stage-aware failure with context array
│ ├── ProcessorNotFoundException.php Registry miss
│ ├── InvalidProcessorConfigurationException.php
│ └── ProcessorPipelineException.php Base exception
├── Handler/
│ └── ProcessorHandler.php Error-collecting processor wrapper
├── Pipeline/
│ └── Pipeline.php Immutable readonly sequential executor
├── Result/
│ └── ProcessingResultCollection.php Error + execution trace accumulator
├── ProcessorBuilder.php Factory: spec → Pipeline
└── ProcessorRegistry.php Context-based processor store
| Decision | Rationale | ADR |
|---|---|---|
Immutable readonly Pipeline |
Eliminates shared-state bugs; safe to reuse across requests | ADR-001 |
| Context-based registry | Prevents name collisions between validator/sanitizer/transformer domains | ADR-002 |
| Flexible spec format | Same interface for simple lists and richly-configured pipelines | ADR-003 |
ProcessorHandler wrapper |
Decouples error collection from processor logic; supports halt-or-continue | — |
PipelineExecutionException with stage context |
Structured observability — which stage, which processor, which cause | — |
| Spec | Covers |
|---|---|
| SPEC-001 | Full pipeline: registry → builder → execution → error collection |
ProcessorPipeline is the execution engine used internally by other KaririCode components:
| Component | Role |
|---|---|
kariricode/validator |
Builds validation pipelines from #[Validate] attributes |
kariricode/sanitizer |
Builds sanitization pipelines from #[Sanitize] attributes |
kariricode/transformer |
Builds transformation pipelines from #[Transform] attributes |
kariricode/property-inspector |
Discovers #[Process] attributes and dispatches to pipeline handlers |
Any component that needs configurable, composable processing chains can be built on top of this engine.
| Metric | Value |
|---|---|
| PHP source files | 8 |
| External runtime dependencies | 2 (contract · exception) |
| Test suite | 128 tests · 234 assertions |
| PHPStan level | 9 |
| Code coverage | 100% classes / methods / lines |
| PHP version | 8.4+ |
| ARFA compliance | 1.3 |
| Test suites | Unit + Integration |
git clone https://github.com/KaririCode-Framework/kariricode-processor-pipeline.git
cd kariricode-processor-pipeline
composer install
kcode init
kcode quality # Must pass before opening a PRPart of the KaririCode Framework ecosystem.
kariricode.org · GitHub · Packagist · Issues