Skip to content

Commit 48d6a45

Browse files
authored
Refactor simplify pipelines (#2193)
* refactor: remove legacy clone from DataFrame * refactor: simplify DataFrame Pipeline - replaced Pipeline interface with Processor interface - migrated Pipeline implementations to Processors - Simplify building multi stage pipelines * fix: regenerated api references
1 parent fb055a9 commit 48d6a45

68 files changed

Lines changed: 2370 additions & 1720 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

src/core/etl/src/Flow/ETL/DataFrame.php

Lines changed: 54 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
namespace Flow\ETL;
66

7-
use function Flow\ETL\DSL\{analyze, refs, to_output};
7+
use function Flow\ETL\DSL\{refs, to_output};
88
use Flow\ETL\DataFrame\GroupedDataFrame;
99
use Flow\ETL\Dataset\Report;
1010
use Flow\ETL\Exception\{InvalidArgumentException, RuntimeException};
@@ -20,19 +20,18 @@
2020
use Flow\ETL\Join\{Expression, Join};
2121
use Flow\ETL\Loader\SchemaValidationLoader;
2222
use Flow\ETL\Loader\StreamLoader\Output;
23-
use Flow\ETL\Pipeline\{BatchingByPipeline,
24-
BatchingPipeline,
25-
CachingPipeline,
26-
CollectingPipeline,
27-
ConstrainedPipeline,
28-
GroupByPipeline,
29-
HashJoinPipeline,
30-
LinkedPipeline,
31-
OffsetPipeline,
32-
PartitioningPipeline,
33-
SortingPipeline,
34-
VoidPipeline,
35-
WindowFunctionPipeline};
23+
use Flow\ETL\Processor\{BatchingByProcessor,
24+
BatchingProcessor,
25+
CachingProcessor,
26+
CollectingProcessor,
27+
ConstrainedProcessor,
28+
GroupByProcessor,
29+
HashJoinProcessor,
30+
OffsetProcessor,
31+
PartitioningProcessor,
32+
SortingProcessor,
33+
VoidProcessor,
34+
WindowProcessor};
3635
use Flow\ETL\Row\{EntryReference, Formatter\ASCIISchemaFormatter, Reference, References};
3736
use Flow\ETL\Schema\{Definition, SchemaFormatter};
3837
use Flow\ETL\Schema\Validator\StrictValidator;
@@ -57,14 +56,13 @@
5756
ScalarFunctionFilterTransformer,
5857
ScalarFunctionTransformer,
5958
SelectEntriesTransformer,
60-
UntilTransformer
61-
};
59+
UntilTransformer};
6260
use Flow\Filesystem\Path\Filter;
6361
use Flow\Types\Type\AutoCaster;
6462

6563
final class DataFrame
6664
{
67-
private FlowContext $context;
65+
private readonly FlowContext $context;
6866

6967
public function __construct(private Pipeline $pipeline, Config|FlowContext $context)
7068
{
@@ -79,7 +77,7 @@ public function aggregate(AggregatingFunction ...$aggregations) : self
7977
$groupBy = new GroupBy();
8078
$groupBy->aggregate(...$aggregations);
8179

82-
$this->pipeline = new LinkedPipeline(new GroupByPipeline($groupBy, $this->pipeline));
80+
$this->pipeline->add(new GroupByProcessor($groupBy));
8381

8482
return $this;
8583
}
@@ -108,7 +106,7 @@ public function autoCast() : self
108106
*/
109107
public function batchBy(string|Reference $column, ?int $minSize = null) : self
110108
{
111-
$this->pipeline = new LinkedPipeline(new BatchingByPipeline($this->pipeline, EntryReference::init($column), $minSize));
109+
$this->pipeline->add(new BatchingByProcessor(EntryReference::init($column), $minSize));
112110

113111
return $this;
114112
}
@@ -132,7 +130,7 @@ public function batchSize(int $size) : self
132130
return $this->collect();
133131
}
134132

135-
$this->pipeline = new LinkedPipeline(new BatchingPipeline($this->pipeline, $size));
133+
$this->pipeline->add(new BatchingProcessor($size));
136134

137135
return $this;
138136
}
@@ -160,11 +158,11 @@ public function cache(?string $id = null, ?int $cacheBatchSize = null) : self
160158
}
161159

162160
if ($cacheBatchSize) {
163-
$this->pipeline = new LinkedPipeline(new CachingPipeline(new BatchingPipeline($this->pipeline, $cacheBatchSize), $id));
164-
} else {
165-
$this->pipeline = new LinkedPipeline(new CachingPipeline($this->pipeline, $id));
161+
$this->pipeline->add(new BatchingProcessor($cacheBatchSize));
166162
}
167163

164+
$this->pipeline->add(new CachingProcessor($id));
165+
168166
return $this;
169167
}
170168

@@ -176,7 +174,7 @@ public function cache(?string $id = null, ?int $cacheBatchSize = null) : self
176174
*/
177175
public function collect() : self
178176
{
179-
$this->pipeline = new LinkedPipeline(new CollectingPipeline($this->pipeline));
177+
$this->pipeline->add(new CollectingProcessor());
180178

181179
return $this;
182180
}
@@ -210,7 +208,7 @@ public function constrain(Constraint $constraint, Constraint ...$constraints) :
210208
{
211209
$constraints = \array_merge([$constraint], $constraints);
212210

213-
$this->pipeline = new LinkedPipeline(new ConstrainedPipeline($this->pipeline, $constraints));
211+
$this->pipeline->add(new ConstrainedProcessor($constraints));
214212

215213
return $this;
216214
}
@@ -221,11 +219,9 @@ public function constrain(Constraint $constraint, Constraint ...$constraints) :
221219
*/
222220
public function count() : int
223221
{
224-
$clone = clone $this;
225-
226222
$total = 0;
227223

228-
foreach ($clone->pipeline->process($clone->context) as $rows) {
224+
foreach ($this->pipeline->process($this->context) as $rows) {
229225
$total += $rows->count();
230226
}
231227

@@ -254,12 +250,11 @@ public function crossJoin(self $dataFrame, string $prefix = '') : self
254250
*/
255251
public function display(int $limit = 20, int|bool $truncate = 20, Formatter $formatter = new AsciiTableFormatter()) : string
256252
{
257-
$clone = clone $this;
258-
$clone->limit($limit);
253+
$this->limit($limit);
259254

260255
$output = '';
261256

262-
foreach ($clone->pipeline->process($clone->context) as $rows) {
257+
foreach ($this->pipeline->process($this->context) as $rows) {
263258
$output .= $formatter->format($rows, $truncate);
264259
}
265260

@@ -327,15 +322,13 @@ public function duplicateRow(mixed $condition, WithEntry ...$entries) : self
327322
*/
328323
public function fetch(?int $limit = null) : Rows
329324
{
330-
$clone = clone $this;
331-
332325
if ($limit !== null) {
333-
$clone->limit($limit);
326+
$this->limit($limit);
334327
}
335328

336329
$rows = new Rows();
337330

338-
foreach ($clone->pipeline->process($clone->context) as $nextRows) {
331+
foreach ($this->pipeline->process($this->context) as $nextRows) {
339332
$rows = $rows->merge($nextRows);
340333
}
341334

@@ -359,7 +352,7 @@ public function filter(ScalarFunction $function) : self
359352
*/
360353
public function filterPartitions(Filter|ScalarFunction $filter) : self
361354
{
362-
$extractor = $this->pipeline->source();
355+
$extractor = $this->pipeline->extractor();
363356

364357
if (!$extractor instanceof FileExtractor) {
365358
throw new RuntimeException('filterPartitions can be used only with extractors that implement FileExtractor interface');
@@ -404,8 +397,7 @@ public function filters(array $functions) : self
404397
*/
405398
public function forEach(?callable $callback = null) : void
406399
{
407-
$clone = clone $this;
408-
$clone->run($callback);
400+
$this->run($callback);
409401
}
410402

411403
/**
@@ -417,9 +409,7 @@ public function forEach(?callable $callback = null) : void
417409
*/
418410
public function get() : \Generator
419411
{
420-
$clone = clone $this;
421-
422-
return $clone->pipeline->process($clone->context);
412+
return $this->pipeline->process($this->context);
423413
}
424414

425415
/**
@@ -431,9 +421,7 @@ public function get() : \Generator
431421
*/
432422
public function getAsArray() : \Generator
433423
{
434-
$clone = clone $this;
435-
436-
foreach ($clone->pipeline->process($clone->context) as $rows) {
424+
foreach ($this->pipeline->process($this->context) as $rows) {
437425
yield $rows->toArray();
438426
}
439427
}
@@ -447,9 +435,7 @@ public function getAsArray() : \Generator
447435
*/
448436
public function getEach() : \Generator
449437
{
450-
$clone = clone $this;
451-
452-
foreach ($clone->pipeline->process($clone->context) as $rows) {
438+
foreach ($this->pipeline->process($this->context) as $rows) {
453439
foreach ($rows as $row) {
454440
yield $row;
455441
}
@@ -465,9 +451,7 @@ public function getEach() : \Generator
465451
*/
466452
public function getEachAsArray() : \Generator
467453
{
468-
$clone = clone $this;
469-
470-
foreach ($clone->pipeline->process($clone->context) as $rows) {
454+
foreach ($this->pipeline->process($this->context) as $rows) {
471455
foreach ($rows as $row) {
472456
yield $row->toArray();
473457
}
@@ -491,7 +475,7 @@ public function join(self $dataFrame, Expression $on, string|Join $type = Join::
491475
$type = Join::from($type);
492476
}
493477

494-
$this->pipeline = new LinkedPipeline(new HashJoinPipeline($this->pipeline, $dataFrame, $on, $type));
478+
$this->pipeline->add(new HashJoinProcessor($dataFrame, $on, $type));
495479

496480
return $this;
497481
}
@@ -611,7 +595,7 @@ public function offset(?int $offset) : self
611595
return $this;
612596
}
613597

614-
$this->pipeline = new LinkedPipeline(new OffsetPipeline($this->pipeline, $offset));
598+
$this->pipeline->add(new OffsetProcessor($offset));
615599

616600
return $this;
617601
}
@@ -633,18 +617,20 @@ public function partitionBy(string|Reference $entry, string|Reference ...$entrie
633617
{
634618
\array_unshift($entries, $entry);
635619

636-
$this->pipeline = new LinkedPipeline(new PartitioningPipeline($this->pipeline, References::init(...$entries)->all()));
620+
$this->pipeline->add(new PartitioningProcessor(References::init(...$entries)->all()));
637621

638622
return $this;
639623
}
640624

641625
public function pivot(Reference $ref) : self
642626
{
643-
if (!$this->pipeline instanceof GroupByPipeline) {
627+
$processor = $this->pipeline->stages()->current()->processor();
628+
629+
if (!$processor instanceof GroupByProcessor) {
644630
throw new RuntimeException('Pivot can be used only after groupBy');
645631
}
646632

647-
$this->pipeline->groupBy->pivot($ref);
633+
$processor->groupBy->pivot($ref);
648634

649635
return $this;
650636
}
@@ -654,30 +640,26 @@ public function pivot(Reference $ref) : self
654640
*/
655641
public function printRows(?int $limit = 20, int|bool $truncate = 20, Formatter $formatter = new AsciiTableFormatter()) : void
656642
{
657-
$clone = clone $this;
658-
659643
if ($limit !== null) {
660-
$clone->limit($limit);
644+
$this->limit($limit);
661645
}
662646

663-
$clone->load(to_output($truncate, Output::rows, $formatter));
647+
$this->load(to_output($truncate, Output::rows, $formatter));
664648

665-
$clone->run();
649+
$this->run();
666650
}
667651

668652
/**
669653
* @trigger
670654
*/
671655
public function printSchema(?int $limit = 20, SchemaFormatter $formatter = new ASCIISchemaFormatter()) : void
672656
{
673-
$clone = clone $this;
674-
675657
if ($limit !== null) {
676-
$clone->limit($limit);
658+
$this->limit($limit);
677659
}
678-
$clone->load(to_output(false, Output::schema, schemaFormatter: $formatter));
660+
$this->load(to_output(false, Output::schema, schemaFormatter: $formatter));
679661

680-
$clone->run();
662+
$this->run();
681663
}
682664

683665
/**
@@ -807,17 +789,15 @@ public function rows(Transformer|Transformation $transformer) : self
807789
*/
808790
public function run(?callable $callback = null, bool|Analyze $analyze = false) : ?Report
809791
{
810-
$clone = clone $this;
811-
812792
if ($analyze === false) {
813793
$analyze = $this->context->config->analyze();
814794
}
815795

816796
$collector = new ReportCollector($analyze, $this->context->config->clock());
817797

818-
foreach ($clone->pipeline->process($clone->context) as $rows) {
798+
foreach ($this->pipeline->process($this->context) as $rows) {
819799
if ($callback !== null) {
820-
$callback($rows, $clone->context);
800+
$callback($rows, $this->context);
821801
}
822802

823803
$collector->capture($rows);
@@ -869,7 +849,7 @@ public function select(string|Reference ...$entries) : self
869849
*/
870850
public function sortBy(Reference ...$entries) : self
871851
{
872-
$this->pipeline = new LinkedPipeline(new SortingPipeline($this->pipeline, refs(...$entries)));
852+
$this->pipeline->add(new SortingProcessor(refs(...$entries)));
873853

874854
return $this;
875855
}
@@ -920,7 +900,7 @@ public function validate(Schema $schema, ?SchemaValidator $validator = null) : s
920900
*/
921901
public function void() : self
922902
{
923-
$this->pipeline = new VoidPipeline($this->pipeline);
903+
$this->pipeline->add(new VoidProcessor());
924904

925905
return $this;
926906
}
@@ -978,24 +958,18 @@ public function withEntry(string|Definition $entry, ScalarFunction|WindowFunctio
978958
{
979959
if ($reference instanceof WindowFunction) {
980960
if (\count($reference->window()->partitions())) {
981-
// When there are partitions, use PartitioningPipeline to ensure all data
982-
// from the same partition is grouped together before processing
983-
$this->pipeline = new LinkedPipeline(
984-
new PartitioningPipeline($this->pipeline, $reference->window()->partitions(), $reference->window()->order())
961+
$this->pipeline->add(
962+
new PartitioningProcessor($reference->window()->partitions(), $reference->window()->order())
985963
);
986964
} else {
987-
// When there are no partitions, collect all data and sort if needed
988965
$this->collect();
989966

990967
if (\count($reference->window()->order())) {
991968
$this->sortBy(...$reference->window()->order());
992969
}
993970
}
994971

995-
// Now wrap in WindowFunctionPipeline to apply the window function
996-
$this->pipeline = new LinkedPipeline(
997-
new WindowFunctionPipeline($this->pipeline, $entry, $reference)
998-
);
972+
$this->pipeline->add(new WindowProcessor($entry, $reference));
999973
} else {
1000974
$this->with(new ScalarFunctionTransformer($entry, $reference));
1001975
}

src/core/etl/src/Flow/ETL/DataFrame/GroupedDataFrame.php

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
use Flow\ETL\{DataFrame, GroupBy};
88
use Flow\ETL\Function\AggregatingFunction;
9-
use Flow\ETL\Pipeline\{GroupByPipeline, LinkedPipeline};
9+
use Flow\ETL\Processor\GroupByProcessor;
1010
use Flow\ETL\Row\Reference;
1111

1212
final readonly class GroupedDataFrame
@@ -19,14 +19,14 @@ public function aggregate(AggregatingFunction ...$aggregations) : DataFrame
1919
{
2020
$this->groupBy->aggregate(...$aggregations);
2121

22-
$pipelineSetter = function (GroupBy $groupBy) : void {
22+
$pipelineAdder = function (GroupBy $groupBy) : void {
2323
/**
2424
* @phpstan-ignore-next-line
2525
*/
26-
$this->pipeline = new LinkedPipeline(new GroupByPipeline($groupBy, $this->pipeline));
26+
$this->pipeline->add(new GroupByProcessor($groupBy));
2727
};
2828

29-
$pipelineSetter->bindTo($this->df, $this->df)($this->groupBy);
29+
$pipelineAdder->bindTo($this->df, $this->df)($this->groupBy);
3030

3131
return $this->df;
3232
}

0 commit comments

Comments
 (0)