Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions rector.tests.php
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@
use Rector\CodingStyle\Rector\FuncCall\FunctionFirstClassCallableRector;
use Rector\CodingStyle\Rector\FunctionLike\FunctionLikeToFirstClassCallableRector;
use Flow\Filesystem\Path;
use Flow\ETL\Window;
use Flow\ETL\Function\RowNumber;
use Flow\ETL\Function\Min;
use Flow\ETL\Function\Max;
use Flow\ETL\Function\Sum;
use Flow\ETL\Function\Average;

return RectorConfig::configure()
->withPaths([
Expand Down Expand Up @@ -129,6 +135,7 @@
new NewObjectToFunction(FlowContext::class, 'Flow\ETL\DSL\flow_context'),
new NewObjectToFunction(Schema::class, 'Flow\ETL\DSL\schema'),
new NewObjectToFunction(Flow::class, 'Flow\ETL\DSL\data_frame'),
new NewObjectToFunction(Window::class, 'Flow\ETL\DSL\window'),

// Entries
new NewObjectToFunction(BooleanEntry::class, 'Flow\ETL\DSL\boolean_entry'),
Expand Down Expand Up @@ -197,6 +204,13 @@
new NewObjectToFunction(ParquetExtractor::class, 'Flow\ETL\Adapter\Parquet\from_parquet'),
new NewObjectToFunction(TextExtractor::class, 'Flow\ETL\Adapter\Text\from_text'),
new NewObjectToFunction(XMLParserExtractor::class, 'Flow\ETL\Adapter\XML\from_xml'),

// Functions
new NewObjectToFunction(RowNumber::class, 'Flow\ETL\DSL\row_number'),
new NewObjectToFunction(Min::class, 'Flow\ETL\DSL\min'),
new NewObjectToFunction(Max::class, 'Flow\ETL\DSL\min'),
new NewObjectToFunction(Sum::class, 'Flow\ETL\DSL\sum'),
new NewObjectToFunction(Average::class, 'Flow\ETL\DSL\average'),
]
)
->withSkip([
Expand Down
19 changes: 14 additions & 5 deletions src/core/etl/src/Flow/ETL/DataFrame.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
OffsetPipeline,
PartitioningPipeline,
SortingPipeline,
VoidPipeline};
VoidPipeline,
WindowFunctionPipeline};
use Flow\ETL\Row\{EntryReference, Formatter\ASCIISchemaFormatter, Reference, References};
use Flow\ETL\Schema\{Definition, SchemaFormatter};
use Flow\ETL\Schema\Validator\StrictValidator;
Expand All @@ -55,8 +56,8 @@
ScalarFunctionFilterTransformer,
ScalarFunctionTransformer,
SelectEntriesTransformer,
UntilTransformer,
WindowFunctionTransformer};
UntilTransformer
};
use Flow\Filesystem\Path\Filter;
use Flow\Types\Type\AutoCaster;

Expand Down Expand Up @@ -1007,16 +1008,24 @@ public function withEntry(string|Definition $entry, ScalarFunction|WindowFunctio
{
if ($reference instanceof WindowFunction) {
if (\count($reference->window()->partitions())) {
$this->pipeline = new LinkedPipeline(new PartitioningPipeline($this->pipeline, $reference->window()->partitions(), $reference->window()->order()));
// When there are partitions, use PartitioningPipeline to ensure all data
// from the same partition is grouped together before processing
$this->pipeline = new LinkedPipeline(
new PartitioningPipeline($this->pipeline, $reference->window()->partitions(), $reference->window()->order())
);
} else {
// When there are no partitions, collect all data and sort if needed
$this->collect();

if (\count($reference->window()->order())) {
$this->sortBy(...$reference->window()->order());
}
}

$this->pipeline->add(new WindowFunctionTransformer($entry, $reference));
// Now wrap in WindowFunctionPipeline to apply the window function
$this->pipeline = new LinkedPipeline(
new WindowFunctionPipeline($this->pipeline, $entry, $reference)
);
} else {
$this->with(new ScalarFunctionTransformer($entry, $reference));
}
Expand Down
147 changes: 147 additions & 0 deletions src/core/etl/src/Flow/ETL/Pipeline/WindowFunctionPipeline.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Pipeline;

use Flow\ETL\Exception\InvalidArgumentException;
use Flow\ETL\{Extractor, FlowContext, Loader, Pipeline, Row, Rows, Transformer};
use Flow\ETL\Function\WindowFunction;
use Flow\ETL\Schema\Definition;

final readonly class WindowFunctionPipeline implements OverridingPipeline, Pipeline
{
/**
* @param Definition<mixed>|string $entry
*/
public function __construct(
private Pipeline $pipeline,
private string|Definition $entry,
private WindowFunction $function,
) {
}

public function add(Loader|Transformer $pipe) : Pipeline
{
$this->pipeline->add($pipe);

return $this;
}

public function has(string $transformerClass) : bool
{
return $this->pipeline->has($transformerClass);
}

public function pipelines() : array
{
return [$this->pipeline];
}

public function pipes() : Pipes
{
return $this->pipeline->pipes();
}

/**
* @return \Generator<int, Rows>
*/
public function process(FlowContext $context) : \Generator
{
$currentPartitionKey = null;
$partitionRows = [];

foreach ($this->pipeline->process($context) as $rows) {
foreach ($rows as $row) {
$partitionKey = $this->extractPartitionKey($row);

if ($currentPartitionKey !== null && $currentPartitionKey !== $partitionKey) {
$processedRows = $this->processPartition($partitionRows, $context);

if ($processedRows->count() > 0) {
yield $processedRows;
}

$partitionRows = [];
}

$partitionRows[] = $row;
$currentPartitionKey = $partitionKey;
}
}

if ([] !== $partitionRows) {
$processedRows = $this->processPartition($partitionRows, $context);

if ($processedRows->count() > 0) {
yield $processedRows;
}
}
}

public function source() : Extractor
{
return $this->pipeline->source();
}

private function extractPartitionKey(Row $row) : string
{
$partitions = $this->function->window()->partitions();

if ([] === $partitions) {
return '__single_partition__';
}

$keyParts = [];

foreach ($partitions as $partition) {
try {
$keyParts[] = $row->valueOf($partition);
} catch (InvalidArgumentException) {
$keyParts[] = null;
}
}

return \serialize($keyParts);
}

/**
* @param array<Row> $rows
*/
private function processPartition(array $rows, FlowContext $context) : Rows
{
if ([] === $rows) {
return new Rows();
}

$partitionRows = new Rows(...$rows);

$orderBy = $this->function->window()->order();

if ([] !== $orderBy) {
$partitionRows = $partitionRows->sortBy(...$orderBy);
}

$processedRows = [];

foreach ($partitionRows as $row) {
$value = $this->function->apply($row, $partitionRows);

$entryName = $this->entry instanceof Definition
? $this->entry->entry()->name()
: $this->entry;

$newRow = $row->add(
$context->entryFactory()->create(
$entryName,
$value,
$this->entry instanceof Definition ? $this->entry : null
)
);

$processedRows[] = $newRow;
}

return new Rows(...$processedRows);
}
}

This file was deleted.

Loading