Skip to content

Commit 19106a4

Browse files
authored
Allow to duplicate row (#1541)
* Allow to duplicate rows based on condition, with applying additional transformations on them * Updated dsl definitions * Simplified assertions in TransformTest
1 parent a207b9c commit 19106a4

45 files changed

Lines changed: 667 additions & 41 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

src/core/etl/src/Flow/ETL/DSL/functions.php

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,8 @@
131131
Rows,
132132
Transformation,
133133
Transformer,
134-
Window};
134+
Window,
135+
WithEntry};
135136
use Flow\Filesystem\Stream\Mode;
136137
use Flow\Filesystem\{Filesystem, Local\NativeLocalFilesystem, Partition, Partitions, Path};
137138
use Flow\Serializer\{NativePHPSerializer, Serializer};
@@ -1698,3 +1699,9 @@ function caster(?Options $options = null) : Caster
16981699
{
16991700
return Caster::default($options ?? caster_options());
17001701
}
1702+
1703+
#[DocumentationDSL(module: Module::CORE, type: DSLType::HELPER)]
1704+
function with_entry(string $name, ScalarFunction $function) : WithEntry
1705+
{
1706+
return new WithEntry($name, $function);
1707+
}

src/core/etl/src/Flow/ETL/DataFrame.php

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,13 @@
2626
SortingPipeline,
2727
VoidPipeline};
2828
use Flow\ETL\Row\{Reference, References, Schema, Schema\Definition};
29-
use Flow\ETL\Transformer\{
30-
AutoCastTransformer,
29+
use Flow\ETL\Transformer\{AutoCastTransformer,
3130
CallbackRowTransformer,
3231
CrossJoinRowsTransformer,
3332
DropDuplicatesTransformer,
3433
DropEntriesTransformer,
3534
DropPartitionsTransformer,
35+
DuplicateRowTransformer,
3636
EntryNameStyleConverterTransformer,
3737
JoinEachRowsTransformer,
3838
LimitTransformer,
@@ -46,8 +46,7 @@
4646
ScalarFunctionTransformer,
4747
SelectEntriesTransformer,
4848
UntilTransformer,
49-
WindowFunctionTransformer
50-
};
49+
WindowFunctionTransformer};
5150
use Flow\Filesystem\Path\Filter;
5251

5352
final class DataFrame
@@ -259,6 +258,13 @@ public function dropPartitions(bool $dropPartitionColumns = false) : self
259258
return $this;
260259
}
261260

261+
public function duplicateRow(mixed $condition, WithEntry ...$entries) : self
262+
{
263+
$this->pipeline->add(new DuplicateRowTransformer($condition, ...$entries));
264+
265+
return $this;
266+
}
267+
262268
/**
263269
* Be aware that fetch is not memory safe and will load all rows into memory.
264270
* If you want to safely iterate over Rows use oe of the following methods:.
@@ -776,7 +782,7 @@ public function sortBy(Reference ...$entries) : self
776782
*
777783
* @lazy
778784
*/
779-
public function transform(Transformer|Transformation|Transformations $transformer) : self
785+
public function transform(Transformer|Transformation|Transformations|WithEntry $transformer) : self
780786
{
781787
return $this->with($transformer);
782788
}
@@ -823,7 +829,7 @@ public function void() : self
823829
/**
824830
* @lazy
825831
*/
826-
public function with(Transformer|Transformation|Transformations $transformer) : self
832+
public function with(Transformer|Transformation|Transformations|WithEntry $transformer) : self
827833
{
828834
if ($transformer instanceof Transformer) {
829835
$this->pipeline->add($transformer);
@@ -837,18 +843,28 @@ public function with(Transformer|Transformation|Transformations $transformer) :
837843
return $this;
838844
}
839845

846+
if ($transformer instanceof WithEntry) {
847+
$this->withEntry($transformer->name, $transformer->function);
848+
849+
return $this;
850+
}
851+
840852
return $transformer->transform($this);
841853
}
842854

843855
/**
844856
* @lazy
845857
*
846-
* @param array<string, ScalarFunction|WindowFunction> $references
858+
* @param array<string, ScalarFunction|WindowFunction|WithEntry> $references
847859
*/
848860
public function withEntries(array $references) : self
849861
{
850862
foreach ($references as $entryName => $ref) {
851-
$this->withEntry($entryName, $ref);
863+
if ($ref instanceof WithEntry) {
864+
$this->withEntry($ref->name, $ref->function);
865+
} else {
866+
$this->withEntry($entryName, $ref);
867+
}
852868
}
853869

854870
return $this;

src/core/etl/src/Flow/ETL/PHP/Type/Native/BooleanType.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public function isValid(mixed $value) : bool
6464
return \is_bool($value);
6565
}
6666

67-
public function makeNullable(bool $nullable) : Type
67+
public function makeNullable(bool $nullable) : self
6868
{
6969
return new self($nullable);
7070
}

src/core/etl/src/Flow/ETL/PHP/Type/Native/IntegerType.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public function isValid(mixed $value) : bool
6969
return \is_int($value);
7070
}
7171

72-
public function makeNullable(bool $nullable) : Type
72+
public function makeNullable(bool $nullable) : self
7373
{
7474
return new self($nullable);
7575
}

src/core/etl/src/Flow/ETL/PHP/Type/Native/StringType.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public function isValid(mixed $value) : bool
6464
return \is_string($value);
6565
}
6666

67-
public function makeNullable(bool $nullable) : Type
67+
public function makeNullable(bool $nullable) : self
6868
{
6969
return new self($nullable);
7070
}

src/core/etl/src/Flow/ETL/Row.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,11 @@ public function add(Entry ...$entries) : self
4242
return new self($this->entries->add(...$entries));
4343
}
4444

45+
public function duplicate() : self
46+
{
47+
return new self($this->entries()->duplicate());
48+
}
49+
4550
public function entries() : Entries
4651
{
4752
return $this->entries;

src/core/etl/src/Flow/ETL/Row/Entries.php

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,17 @@ public function count() : int
7878
return \count($this->entries);
7979
}
8080

81+
public function duplicate() : self
82+
{
83+
$entries = [];
84+
85+
foreach ($this->all() as $entry) {
86+
$entries[$entry->name()] = $entry->duplicate();
87+
}
88+
89+
return self::recreate($entries);
90+
}
91+
8192
/**
8293
* @throws InvalidArgumentException
8394
*

src/core/etl/src/Flow/ETL/Row/Entry.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@ public function __toString() : string;
1717

1818
public function definition() : Definition;
1919

20+
/**
21+
* @return Entry<TValue, TType>
22+
*/
23+
public function duplicate() : self;
24+
2025
public function is(string|Reference $name) : bool;
2126

2227
/**

src/core/etl/src/Flow/ETL/Row/Entry/BooleanEntry.php

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,7 @@ final class BooleanEntry implements Entry
2020

2121
private Metadata $metadata;
2222

23-
/**
24-
* @var Type<?bool>
25-
*/
26-
private readonly Type $type;
23+
private readonly BooleanType $type;
2724

2825
/**
2926
* @throws InvalidArgumentException
@@ -49,6 +46,11 @@ public function definition() : Definition
4946
return new Definition($this->name, $this->type, $this->metadata);
5047
}
5148

49+
public function duplicate() : Entry
50+
{
51+
return new self($this->name, $this->value, $this->type, $this->metadata);
52+
}
53+
5254
public function is(string|Reference $name) : bool
5355
{
5456
if ($name instanceof Reference) {

src/core/etl/src/Flow/ETL/Row/Entry/DateEntry.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,11 @@ public function definition() : Definition
6262
return new Definition($this->name, $this->type, $this->metadata);
6363
}
6464

65+
public function duplicate() : Entry
66+
{
67+
return new self($this->name, $this->value ? clone $this->value : null, $this->type, $this->metadata);
68+
}
69+
6570
public function is(string|Reference $name) : bool
6671
{
6772
if ($name instanceof Reference) {

0 commit comments

Comments
 (0)