Skip to content

Commit 4ee6990

Browse files
authored
refactoring: moved logic related to window from transformer to pipeline (#1973)
* refactoring: moved logic related to window from transformer to pipeline * fix: missing unit tests for WindowFunctionTransformer
1 parent 73ccd04 commit 4ee6990

6 files changed

Lines changed: 507 additions & 51 deletions

File tree

rector.tests.php

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,12 @@
7777
use Rector\CodingStyle\Rector\FuncCall\FunctionFirstClassCallableRector;
7878
use Rector\CodingStyle\Rector\FunctionLike\FunctionLikeToFirstClassCallableRector;
7979
use Flow\Filesystem\Path;
80+
use Flow\ETL\Window;
81+
use Flow\ETL\Function\RowNumber;
82+
use Flow\ETL\Function\Min;
83+
use Flow\ETL\Function\Max;
84+
use Flow\ETL\Function\Sum;
85+
use Flow\ETL\Function\Average;
8086

8187
return RectorConfig::configure()
8288
->withPaths([
@@ -129,6 +135,7 @@
129135
new NewObjectToFunction(FlowContext::class, 'Flow\ETL\DSL\flow_context'),
130136
new NewObjectToFunction(Schema::class, 'Flow\ETL\DSL\schema'),
131137
new NewObjectToFunction(Flow::class, 'Flow\ETL\DSL\data_frame'),
138+
new NewObjectToFunction(Window::class, 'Flow\ETL\DSL\window'),
132139

133140
// Entries
134141
new NewObjectToFunction(BooleanEntry::class, 'Flow\ETL\DSL\boolean_entry'),
@@ -197,6 +204,13 @@
197204
new NewObjectToFunction(ParquetExtractor::class, 'Flow\ETL\Adapter\Parquet\from_parquet'),
198205
new NewObjectToFunction(TextExtractor::class, 'Flow\ETL\Adapter\Text\from_text'),
199206
new NewObjectToFunction(XMLParserExtractor::class, 'Flow\ETL\Adapter\XML\from_xml'),
207+
208+
// Functions
209+
new NewObjectToFunction(RowNumber::class, 'Flow\ETL\DSL\row_number'),
210+
new NewObjectToFunction(Min::class, 'Flow\ETL\DSL\min'),
211+
new NewObjectToFunction(Max::class, 'Flow\ETL\DSL\min'),
212+
new NewObjectToFunction(Sum::class, 'Flow\ETL\DSL\sum'),
213+
new NewObjectToFunction(Average::class, 'Flow\ETL\DSL\average'),
200214
]
201215
)
202216
->withSkip([

src/core/etl/src/Flow/ETL/DataFrame.php

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@
3030
OffsetPipeline,
3131
PartitioningPipeline,
3232
SortingPipeline,
33-
VoidPipeline};
33+
VoidPipeline,
34+
WindowFunctionPipeline};
3435
use Flow\ETL\Row\{EntryReference, Formatter\ASCIISchemaFormatter, Reference, References};
3536
use Flow\ETL\Schema\{Definition, SchemaFormatter};
3637
use Flow\ETL\Schema\Validator\StrictValidator;
@@ -55,8 +56,8 @@
5556
ScalarFunctionFilterTransformer,
5657
ScalarFunctionTransformer,
5758
SelectEntriesTransformer,
58-
UntilTransformer,
59-
WindowFunctionTransformer};
59+
UntilTransformer
60+
};
6061
use Flow\Filesystem\Path\Filter;
6162
use Flow\Types\Type\AutoCaster;
6263

@@ -1007,16 +1008,24 @@ public function withEntry(string|Definition $entry, ScalarFunction|WindowFunctio
10071008
{
10081009
if ($reference instanceof WindowFunction) {
10091010
if (\count($reference->window()->partitions())) {
1010-
$this->pipeline = new LinkedPipeline(new PartitioningPipeline($this->pipeline, $reference->window()->partitions(), $reference->window()->order()));
1011+
// When there are partitions, use PartitioningPipeline to ensure all data
1012+
// from the same partition is grouped together before processing
1013+
$this->pipeline = new LinkedPipeline(
1014+
new PartitioningPipeline($this->pipeline, $reference->window()->partitions(), $reference->window()->order())
1015+
);
10111016
} else {
1017+
// When there are no partitions, collect all data and sort if needed
10121018
$this->collect();
10131019

10141020
if (\count($reference->window()->order())) {
10151021
$this->sortBy(...$reference->window()->order());
10161022
}
10171023
}
10181024

1019-
$this->pipeline->add(new WindowFunctionTransformer($entry, $reference));
1025+
// Now wrap in WindowFunctionPipeline to apply the window function
1026+
$this->pipeline = new LinkedPipeline(
1027+
new WindowFunctionPipeline($this->pipeline, $entry, $reference)
1028+
);
10201029
} else {
10211030
$this->with(new ScalarFunctionTransformer($entry, $reference));
10221031
}
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Flow\ETL\Pipeline;
6+
7+
use Flow\ETL\Exception\InvalidArgumentException;
8+
use Flow\ETL\{Extractor, FlowContext, Loader, Pipeline, Row, Rows, Transformer};
9+
use Flow\ETL\Function\WindowFunction;
10+
use Flow\ETL\Schema\Definition;
11+
12+
final readonly class WindowFunctionPipeline implements OverridingPipeline, Pipeline
13+
{
14+
/**
15+
* @param Definition<mixed>|string $entry
16+
*/
17+
public function __construct(
18+
private Pipeline $pipeline,
19+
private string|Definition $entry,
20+
private WindowFunction $function,
21+
) {
22+
}
23+
24+
public function add(Loader|Transformer $pipe) : Pipeline
25+
{
26+
$this->pipeline->add($pipe);
27+
28+
return $this;
29+
}
30+
31+
public function has(string $transformerClass) : bool
32+
{
33+
return $this->pipeline->has($transformerClass);
34+
}
35+
36+
public function pipelines() : array
37+
{
38+
return [$this->pipeline];
39+
}
40+
41+
public function pipes() : Pipes
42+
{
43+
return $this->pipeline->pipes();
44+
}
45+
46+
/**
47+
* @return \Generator<int, Rows>
48+
*/
49+
public function process(FlowContext $context) : \Generator
50+
{
51+
$currentPartitionKey = null;
52+
$partitionRows = [];
53+
54+
foreach ($this->pipeline->process($context) as $rows) {
55+
foreach ($rows as $row) {
56+
$partitionKey = $this->extractPartitionKey($row);
57+
58+
if ($currentPartitionKey !== null && $currentPartitionKey !== $partitionKey) {
59+
$processedRows = $this->processPartition($partitionRows, $context);
60+
61+
if ($processedRows->count() > 0) {
62+
yield $processedRows;
63+
}
64+
65+
$partitionRows = [];
66+
}
67+
68+
$partitionRows[] = $row;
69+
$currentPartitionKey = $partitionKey;
70+
}
71+
}
72+
73+
if ([] !== $partitionRows) {
74+
$processedRows = $this->processPartition($partitionRows, $context);
75+
76+
if ($processedRows->count() > 0) {
77+
yield $processedRows;
78+
}
79+
}
80+
}
81+
82+
public function source() : Extractor
83+
{
84+
return $this->pipeline->source();
85+
}
86+
87+
private function extractPartitionKey(Row $row) : string
88+
{
89+
$partitions = $this->function->window()->partitions();
90+
91+
if ([] === $partitions) {
92+
return '__single_partition__';
93+
}
94+
95+
$keyParts = [];
96+
97+
foreach ($partitions as $partition) {
98+
try {
99+
$keyParts[] = $row->valueOf($partition);
100+
} catch (InvalidArgumentException) {
101+
$keyParts[] = null;
102+
}
103+
}
104+
105+
return \serialize($keyParts);
106+
}
107+
108+
/**
109+
* @param array<Row> $rows
110+
*/
111+
private function processPartition(array $rows, FlowContext $context) : Rows
112+
{
113+
if ([] === $rows) {
114+
return new Rows();
115+
}
116+
117+
$partitionRows = new Rows(...$rows);
118+
119+
$orderBy = $this->function->window()->order();
120+
121+
if ([] !== $orderBy) {
122+
$partitionRows = $partitionRows->sortBy(...$orderBy);
123+
}
124+
125+
$processedRows = [];
126+
127+
foreach ($partitionRows as $row) {
128+
$value = $this->function->apply($row, $partitionRows);
129+
130+
$entryName = $this->entry instanceof Definition
131+
? $this->entry->entry()->name()
132+
: $this->entry;
133+
134+
$newRow = $row->add(
135+
$context->entryFactory()->create(
136+
$entryName,
137+
$value,
138+
$this->entry instanceof Definition ? $this->entry : null
139+
)
140+
);
141+
142+
$processedRows[] = $newRow;
143+
}
144+
145+
return new Rows(...$processedRows);
146+
}
147+
}

src/core/etl/src/Flow/ETL/Transformer/WindowFunctionTransformer.php

Lines changed: 0 additions & 45 deletions
This file was deleted.

0 commit comments

Comments
 (0)