Skip to content

Commit 3ed245d

Browse files
committed
Fixed batch size optimization for deeply nested pipelines
1 parent 4521da1 commit 3ed245d

17 files changed

Lines changed: 362 additions & 35 deletions

File tree

src/adapter/etl-adapter-xml/src/Flow/ETL/Adapter/XML/RowsNormalizer/EntryNormalizer.php

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,18 @@
1010
use Flow\ETL\PHP\Type\Logical\{ListType, MapType, StructureType};
1111
use Flow\ETL\PHP\Type\Type;
1212
use Flow\ETL\Row\Entry;
13-
use Flow\ETL\Row\Entry\{BooleanEntry, DateTimeEntry, EnumEntry, FloatEntry, IntegerEntry, JsonEntry, ListEntry, MapEntry, StringEntry, StructureEntry, UuidEntry};
13+
use Flow\ETL\Row\Entry\{BooleanEntry,
14+
DateTimeEntry,
15+
EnumEntry,
16+
FloatEntry,
17+
IntegerEntry,
18+
JsonEntry,
19+
ListEntry,
20+
MapEntry,
21+
StringEntry,
22+
StructureEntry,
23+
UuidEntry,
24+
XMLEntry};
1425

1526
final readonly class EntryNormalizer
1627
{
@@ -50,6 +61,7 @@ public function normalize(Entry $entry) : XMLNode|XMLAttribute
5061
EnumEntry::class => XMLNode::flatNode($entry->name(), $entry->toString()),
5162
JsonEntry::class => XMLNode::flatNode($entry->name(), $entry->toString()),
5263
UuidEntry::class => XMLNode::flatNode($entry->name(), $entry->toString()),
64+
XMLEntry::class => XMLNode::flatNode($entry->name(), $entry->toString()),
5365
default => throw new InvalidArgumentException("Given entry type can't be converted to node, given entry type: " . $entry::class),
5466
};
5567
}

src/core/etl/src/Flow/ETL/Pipeline/BatchingPipeline.php

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@
88
use Flow\ETL\Exception\InvalidArgumentException;
99
use Flow\ETL\{Extractor, FlowContext, Loader, Pipeline, Transformer};
1010

11-
final readonly class BatchingPipeline implements Pipeline
11+
final readonly class BatchingPipeline implements OverridingPipeline, Pipeline
1212
{
13+
use RecursivePipelineIterator;
14+
1315
/**
1416
* @param Pipeline $pipeline
1517
* @param int<1, max> $size
@@ -35,6 +37,11 @@ public function has(string $transformerClass) : bool
3537
return $this->pipeline->has($transformerClass);
3638
}
3739

40+
public function pipelines() : array
41+
{
42+
return $this->allPipelines($this->pipeline);
43+
}
44+
3845
public function pipes() : Pipes
3946
{
4047
return $this->pipeline->pipes();

src/core/etl/src/Flow/ETL/Pipeline/CachingPipeline.php

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@
66

77
use Flow\ETL\{Cache\CacheIndex, Extractor, FlowContext, Loader, Pipeline, Transformer};
88

9-
final readonly class CachingPipeline implements Pipeline
9+
final readonly class CachingPipeline implements OverridingPipeline, Pipeline
1010
{
11+
use RecursivePipelineIterator;
12+
1113
public function __construct(private Pipeline $pipeline, private ?string $id = null)
1214
{
1315
}
@@ -24,6 +26,11 @@ public function has(string $transformerClass) : bool
2426
return $this->pipeline->has($transformerClass);
2527
}
2628

29+
public function pipelines() : array
30+
{
31+
return $this->allPipelines($this->pipeline);
32+
}
33+
2734
public function pipes() : Pipes
2835
{
2936
return $this->pipeline->pipes();

src/core/etl/src/Flow/ETL/Pipeline/CollectingPipeline.php

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,10 @@
99
/**
1010
* @internal
1111
*/
12-
final readonly class CollectingPipeline implements Pipeline
12+
final readonly class CollectingPipeline implements OverridingPipeline, Pipeline
1313
{
14+
use RecursivePipelineIterator;
15+
1416
public function __construct(private Pipeline $pipeline)
1517
{
1618
}
@@ -27,6 +29,11 @@ public function has(string $transformerClass) : bool
2729
return $this->pipeline->has($transformerClass);
2830
}
2931

32+
public function pipelines() : array
33+
{
34+
return $this->allPipelines($this->pipeline);
35+
}
36+
3037
public function pipes() : Pipes
3138
{
3239
return $this->pipeline->pipes();

src/core/etl/src/Flow/ETL/Pipeline/ConstrainedPipeline.php

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@
77
use Flow\ETL\{Constraint, Extractor, FlowContext, Loader, Pipeline, Transformer};
88
use Flow\ETL\Exception\{ConstraintViolationException, InvalidArgumentException};
99

10-
final class ConstrainedPipeline implements Pipeline
10+
final class ConstrainedPipeline implements OverridingPipeline, Pipeline
1111
{
12+
use RecursivePipelineIterator;
13+
1214
private int $rowIndex = 0;
1315

1416
/**
@@ -38,6 +40,14 @@ public function has(string $transformerClass) : bool
3840
return $this->pipeline->has($transformerClass);
3941
}
4042

43+
/**
44+
* @return array<Pipeline>
45+
*/
46+
public function pipelines() : array
47+
{
48+
return $this->allPipelines($this->pipeline);
49+
}
50+
4151
public function pipes() : Pipes
4252
{
4353
return $this->pipeline->pipes();

src/core/etl/src/Flow/ETL/Pipeline/GroupByPipeline.php

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@
66

77
use Flow\ETL\{Extractor, FlowContext, GroupBy, Loader, Pipeline, Transformer};
88

9-
final readonly class GroupByPipeline implements Pipeline
9+
final readonly class GroupByPipeline implements OverridingPipeline, Pipeline
1010
{
11+
use RecursivePipelineIterator;
12+
1113
public function __construct(public GroupBy $groupBy, private Pipeline $pipeline)
1214
{
1315
}
@@ -24,6 +26,11 @@ public function has(string $transformerClass) : bool
2426
return $this->pipeline->has($transformerClass);
2527
}
2628

29+
public function pipelines() : array
30+
{
31+
return $this->allPipelines($this->pipeline);
32+
}
33+
2734
public function pipes() : Pipes
2835
{
2936
return $this->pipeline->pipes();

src/core/etl/src/Flow/ETL/Pipeline/HashJoinPipeline.php

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@
1212
use Flow\ETL\Pipeline\HashJoin\HashTable;
1313
use Flow\ETL\Row\Entry;
1414

15-
final readonly class HashJoinPipeline implements Pipeline
15+
final readonly class HashJoinPipeline implements OverridingPipeline, Pipeline
1616
{
17+
use RecursivePipelineIterator;
18+
1719
private Extractor $extractor;
1820

1921
public function __construct(
@@ -38,6 +40,11 @@ public function has(string $transformerClass) : bool
3840
return $this->left->has($transformerClass);
3941
}
4042

43+
public function pipelines() : array
44+
{
45+
return $this->allPipelines($this->left);
46+
}
47+
4148
public function pipes() : Pipes
4249
{
4350
return $this->left->pipes();

src/core/etl/src/Flow/ETL/Pipeline/LinkedPipeline.php

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
*/
1717
final readonly class LinkedPipeline implements OverridingPipeline, Pipeline
1818
{
19+
use RecursivePipelineIterator;
20+
1921
private Pipeline $nextPipeline;
2022

2123
public function __construct(
@@ -41,18 +43,7 @@ public function has(string $transformerClass) : bool
4143
*/
4244
public function pipelines() : array
4345
{
44-
$pipelines = [];
45-
46-
if ($this->pipeline instanceof OverridingPipeline) {
47-
$pipelines = $this->pipeline->pipelines();
48-
}
49-
50-
$pipelines[] = $this->pipeline;
51-
52-
// if ($this->nextPipeline instanceof OverridingPipeline) {
53-
// $pipelines = \array_merge($pipelines, $this->nextPipeline->pipelines());
54-
// }
55-
46+
$pipelines = $this->allPipelines($this->pipeline);
5647
$pipelines[] = $this->nextPipeline;
5748

5849
return $pipelines;

src/core/etl/src/Flow/ETL/Pipeline/Optimizer/BatchSizeOptimization.php

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,16 +33,19 @@ final class BatchSizeOptimization implements Optimization
3333
* @var array<class-string<Loader>>
3434
*/
3535
private array $supportedLoaders = [
36-
\Flow\ETL\Adapter\Doctrine\DbalLoader::class,
37-
\Flow\ETL\Adapter\Elasticsearch\ElasticsearchPHP\ElasticsearchLoader::class,
38-
\Flow\ETL\Adapter\Meilisearch\MeilisearchPHP\MeilisearchLoader::class,
36+
'Flow\ETL\Adapter\Doctrine\DbalLoader',
37+
'Flow\ETL\Adapter\Elasticsearch\ElasticsearchPHP\ElasticsearchLoader',
38+
'Flow\ETL\Adapter\Meilisearch\MeilisearchPHP\MeilisearchLoader',
3939
];
4040

4141
/**
4242
* @param int<1, max> $batchSize
4343
*/
44-
public function __construct(private readonly int $batchSize = 1000)
44+
public function __construct(private readonly int $batchSize = 1000, ?array $supportedLoaders = null)
4545
{
46+
if ($supportedLoaders !== null) {
47+
$this->supportedLoaders = $supportedLoaders;
48+
}
4649
}
4750

4851
public function isFor(Loader|Transformer $element, Pipeline $pipeline) : bool

src/core/etl/src/Flow/ETL/Pipeline/PartitioningPipeline.php

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
use Flow\ETL\Row\Reference;
1919
use Flow\Filesystem\Partition;
2020

21-
final readonly class PartitioningPipeline implements Pipeline
21+
final readonly class PartitioningPipeline implements OverridingPipeline, Pipeline
2222
{
2323
private Algorithm $hashAlgorithm;
2424

@@ -52,6 +52,30 @@ public function has(string $transformerClass) : bool
5252
return $this->pipeline->has($transformerClass);
5353
}
5454

55+
/**
56+
* @return array<Pipeline>
57+
*/
58+
public function pipelines() : array
59+
{
60+
$pipelines = [];
61+
62+
$recursivelyAddPipelines = function (Pipeline $pipeline, array &$pipelines) use (&$recursivelyAddPipelines) : void {
63+
if ($pipeline instanceof OverridingPipeline) {
64+
$pipelines[] = $pipeline;
65+
66+
foreach ($pipeline->pipelines() as $p) {
67+
$recursivelyAddPipelines($p, $pipelines);
68+
}
69+
} else {
70+
$pipelines[] = $pipeline;
71+
}
72+
};
73+
74+
$pipelines[] = $this->pipeline;
75+
76+
return $pipelines;
77+
}
78+
5579
public function pipes() : Pipes
5680
{
5781
return $this->pipeline->pipes();

0 commit comments

Comments
 (0)