Skip to content

Commit e9ae6ed

Browse files
committed
Added Select / Drop reusable transformations:
1 parent 076032b commit e9ae6ed

7 files changed

Lines changed: 450 additions & 0 deletions

File tree

documentation/components/core/core.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ For detailed information about specific DataFrame operations, see the following
7979

8080
### Core Operations
8181
- **[Building Blocks](building-blocks.md)** - Understanding Rows, Entries, and basic data structures
82+
- **[Transformations](transformations.md)** - Reusable DataFrame transformations and the Transformation interface
8283
- **[Select/Drop](select-drop.md)** - Column selection and removal
8384
- **[Rename](rename.md)** - Column renaming strategies
8485
- **[Map](map.md)** - Row transformations and data mapping
Lines changed: 257 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,257 @@
1+
# Transformations
2+
3+
- [⬅️️ Back](/documentation/components/core/core.md)
4+
5+
## Introduction
6+
7+
Transformations are a powerful abstraction in Flow PHP that allow you to modify DataFrames in a composable and reusable
8+
way. Unlike Transformers which operate on individual Rows, Transformations work at the DataFrame level, providing access
9+
to the full processing pipeline.
10+
11+
Every Transformation implements the `Transformation` interface with a single method:
12+
13+
```php
14+
interface Transformation
15+
{
16+
public function transform(DataFrame $dataFrame): DataFrame;
17+
}
18+
```
19+
20+
## Using Transformations
21+
22+
Transformations can be applied to DataFrames using two methods:
23+
24+
- `with()` - Applies one or more transformations
25+
- `transform()` - Alias for `with()`, provides semantic clarity
26+
27+
Both methods accept `Transformation` objects directly or through convenient DSL functions.
28+
29+
```php
30+
use function Flow\ETL\DSL\{df, from_array, select, drop};
31+
32+
// Using with()
33+
df()
34+
->read(from_array([/* ... */]))
35+
->with(select('id', 'name'))
36+
->write(to_output())
37+
->run();
38+
39+
// Using transform() - identical behavior
40+
df()
41+
->read(from_array([/* ... */]))
42+
->transform(drop('temporary_column'))
43+
->write(to_output())
44+
->run();
45+
```
46+
47+
## Available Transformations
48+
49+
### Select
50+
51+
Select specific columns from the DataFrame, keeping only the columns you need.
52+
53+
```php
54+
use function Flow\ETL\DSL\{df, from_array, select, ref};
55+
56+
// Select columns by name
57+
df()
58+
->read(from_array([
59+
['id' => 1, 'name' => 'Alice', 'age' => 25, 'city' => 'New York'],
60+
['id' => 2, 'name' => 'Bob', 'age' => 30, 'city' => 'Los Angeles'],
61+
]))
62+
->with(select('id', 'name'))
63+
->write(to_output())
64+
->run();
65+
66+
// Select using References for more control
67+
df()
68+
->read(from_array([/* ... */]))
69+
->with(select(ref('id'), ref('city')))
70+
->write(to_output())
71+
->run();
72+
```
73+
74+
### Drop
75+
76+
Remove unwanted columns from the DataFrame, keeping all other columns.
77+
78+
```php
79+
use function Flow\ETL\DSL\{df, from_array, drop, ref};
80+
81+
// Drop columns by name
82+
df()
83+
->read(from_array([
84+
['id' => 1, 'password' => 'secret', 'name' => 'Alice'],
85+
['id' => 2, 'password' => 'hidden', 'name' => 'Bob'],
86+
]))
87+
->with(drop('password'))
88+
->write(to_output())
89+
->run();
90+
91+
// Drop using References
92+
df()
93+
->read(from_array([/* ... */]))
94+
->with(drop(ref('temp_column'), ref('debug_info')))
95+
->write(to_output())
96+
->run();
97+
```
98+
99+
### Batch Size
100+
101+
Control memory usage by setting the batch size for processing. Smaller batch sizes reduce memory consumption when
102+
processing large datasets.
103+
104+
```php
105+
use function Flow\ETL\DSL\{df, from_csv, batch_size};
106+
107+
// Process large CSV file in batches of 100 rows
108+
df()
109+
->read(from_csv('huge_file.csv'))
110+
->with(batch_size(100))
111+
->write(to_database('users'))
112+
->run();
113+
```
114+
115+
### Add Row Index
116+
117+
Add an index column to each row, useful for tracking row position or creating unique identifiers.
118+
119+
```php
120+
use function Flow\ETL\DSL\{df, from_array, add_row_index};
121+
use Flow\ETL\Transformation\AddRowIndex\StartFrom;
122+
123+
// Add default index starting from 0
124+
df()
125+
->read(from_array([
126+
['name' => 'Alice'],
127+
['name' => 'Bob'],
128+
]))
129+
->with(add_row_index())
130+
->write(to_output())
131+
->run();
132+
// Output: [['index' => 0, 'name' => 'Alice'], ['index' => 1, 'name' => 'Bob']]
133+
134+
// Custom column name and start from 1
135+
df()
136+
->read(from_array([/* ... */]))
137+
->with(add_row_index('row_number', StartFrom::ONE))
138+
->write(to_output())
139+
->run();
140+
```
141+
142+
### Limit
143+
144+
Restrict the number of rows processed, useful for debugging or sampling data.
145+
146+
```php
147+
use function Flow\ETL\DSL\{df, from_database, limit};
148+
149+
// Process only first 1000 rows
150+
df()
151+
->read(from_database('large_table'))
152+
->with(limit(1000))
153+
->write(to_csv('sample.csv'))
154+
->run();
155+
156+
// Remove limit (process all rows)
157+
df()
158+
->read(from_array([/* ... */]))
159+
->with(limit(null))
160+
->write(to_output())
161+
->run();
162+
```
163+
164+
### Mask Columns
165+
166+
Replace column values with a mask string, useful for hiding sensitive information.
167+
168+
```php
169+
use function Flow\ETL\DSL\{df, from_array, mask_columns};
170+
171+
// Mask sensitive columns with default mask
172+
df()
173+
->read(from_array([
174+
['name' => 'Alice', 'ssn' => '123-45-6789', 'salary' => 50000],
175+
['name' => 'Bob', 'ssn' => '987-65-4321', 'salary' => 60000],
176+
]))
177+
->with(mask_columns(['ssn', 'salary']))
178+
->write(to_output())
179+
->run();
180+
// Output: [['name' => 'Alice', 'ssn' => '******', 'salary' => '******'], ...]
181+
182+
// Use custom mask
183+
df()
184+
->read(from_array([/* ... */]))
185+
->with(mask_columns(['credit_card'], '[REDACTED]'))
186+
->write(to_output())
187+
->run();
188+
```
189+
190+
## Chaining Transformations
191+
192+
Transformations can be chained together to create complex data processing pipelines:
193+
194+
```php
195+
use function Flow\ETL\DSL\{df, from_csv, select, add_row_index, limit, batch_size};
196+
197+
df()
198+
->read(from_csv('users.csv'))
199+
->with(select('id', 'name', 'email')) // Keep only needed columns
200+
->with(add_row_index('row_num')) // Add row numbers
201+
->with(limit(1000)) // Process only first 1000
202+
->with(batch_size(50)) // Process in batches of 50
203+
->write(to_json('users_sample.json'))
204+
->run();
205+
```
206+
207+
## Using with to_transformation Loader
208+
209+
The `to_transformation` loader allows you to apply transformations as part of the loading phase, enabling complex ETL
210+
patterns:
211+
212+
```php
213+
use function Flow\ETL\DSL\{df, from_array, to_transformation, to_csv, select};
214+
215+
// Apply transformation before loading
216+
df()
217+
->read(from_array([/* ... */]))
218+
->write(
219+
to_transformation(
220+
select('id', 'name'), // Transform data
221+
to_csv('output.csv') // Then write to CSV
222+
)
223+
)
224+
->run();
225+
```
226+
227+
This pattern is particularly useful when you need to:
228+
229+
- Apply different transformations to the same data for multiple outputs
230+
- Create transformation pipelines that can be reused
231+
- Separate transformation logic from extraction and loading
232+
233+
## Creating Custom Transformations
234+
235+
You can create custom transformations by implementing the `Transformation` interface:
236+
237+
```php
238+
use Flow\ETL\{DataFrame, Transformation};
239+
240+
final class UppercaseNames implements Transformation
241+
{
242+
public function transform(DataFrame $dataFrame): DataFrame
243+
{
244+
return $dataFrame->withEntry(
245+
'name',
246+
ref('name')->upper()
247+
);
248+
}
249+
}
250+
251+
// Use custom transformation
252+
df()
253+
->read(from_array([/* ... */]))
254+
->with(new UppercaseNames())
255+
->write(to_output())
256+
->run();
257+
```

src/core/etl/src/Flow/ETL/DSL/functions.php

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,8 @@
169169
use Flow\ETL\Schema\Metadata;
170170
use Flow\ETL\Schema\Validator\{EvolvingValidator, SelectiveValidator, StrictValidator};
171171
use Flow\ETL\Time\{Duration, Sleep, SystemSleep};
172+
use Flow\ETL\Transformation\AddRowIndex\StartFrom;
173+
use Flow\ETL\Transformation\{AddRowIndex, BatchSize, Drop, Limit, MaskColumns, Select};
172174
use Flow\ETL\Transformer\OrderEntries\{CombinedComparator, Comparator, NameComparator, Order, TypeComparator, TypePriorities};
173175
use Flow\ETL\Transformer\Rename\{RenameCaseEntryStrategy, RenameReplaceEntryStrategy};
174176
use Flow\Filesystem\{Filesystem, Local\NativeLocalFilesystem, Partition, Partitions, Path};
@@ -1065,6 +1067,48 @@ function refs(string|Reference ...$entries) : References
10651067
return new References(...$entries);
10661068
}
10671069

1070+
#[DocumentationDSL(module: Module::CORE, type: DSLType::TRANSFORMER)]
1071+
function select(string|Reference ...$entries) : Select
1072+
{
1073+
return new Select(...$entries);
1074+
}
1075+
1076+
#[DocumentationDSL(module: Module::CORE, type: DSLType::TRANSFORMER)]
1077+
function drop(string|Reference ...$entries) : Drop
1078+
{
1079+
return new Drop(...$entries);
1080+
}
1081+
1082+
#[DocumentationDSL(module: Module::CORE, type: DSLType::TRANSFORMER)]
1083+
function add_row_index(string $column = 'index', StartFrom $startFrom = StartFrom::ZERO) : AddRowIndex
1084+
{
1085+
return new AddRowIndex($column, $startFrom);
1086+
}
1087+
1088+
/**
1089+
* @param int<1, max> $size
1090+
*/
1091+
#[DocumentationDSL(module: Module::CORE, type: DSLType::TRANSFORMER)]
1092+
function batch_size(int $size) : BatchSize
1093+
{
1094+
return new BatchSize($size);
1095+
}
1096+
1097+
#[DocumentationDSL(module: Module::CORE, type: DSLType::TRANSFORMER)]
1098+
function limit(?int $limit) : Limit
1099+
{
1100+
return new Limit($limit);
1101+
}
1102+
1103+
/**
1104+
* @param array<int, string> $columns
1105+
*/
1106+
#[DocumentationDSL(module: Module::CORE, type: DSLType::TRANSFORMER)]
1107+
function mask_columns(array $columns = [], string $mask = '******') : MaskColumns
1108+
{
1109+
return new MaskColumns($columns, $mask);
1110+
}
1111+
10681112
#[DocumentationDSL(module: Module::CORE, type: DSLType::SCALAR_FUNCTION)]
10691113
function optional(ScalarFunction $function) : Optional
10701114
{
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Flow\ETL\Transformation;
6+
7+
use Flow\ETL\{DataFrame, Transformation};
8+
use Flow\ETL\Row\{Reference, References};
9+
10+
final readonly class Drop implements Transformation
11+
{
12+
private References $references;
13+
14+
public function __construct(string|Reference ...$entries)
15+
{
16+
$this->references = References::init(...$entries);
17+
}
18+
19+
public function transform(DataFrame $dataFrame) : DataFrame
20+
{
21+
return $dataFrame->drop(...$this->references->all());
22+
}
23+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Flow\ETL\Transformation;
6+
7+
use Flow\ETL\{DataFrame, Transformation};
8+
use Flow\ETL\Row\{Reference, References};
9+
10+
final readonly class Select implements Transformation
11+
{
12+
private References $references;
13+
14+
public function __construct(string|Reference ...$entries)
15+
{
16+
$this->references = References::init(...$entries);
17+
}
18+
19+
public function transform(DataFrame $dataFrame) : DataFrame
20+
{
21+
return $dataFrame->select(...$this->references->all());
22+
}
23+
}

0 commit comments

Comments
 (0)