Skip to content

Commit a237c2b

Browse files
committed
fix: partitioning documentation
- avoid removing files from other datasets using the same partitiong keys and the same output folder - add more partitioning examples - add more partitioning documentation
1 parent 1a9ed3a commit a237c2b

39 files changed

Lines changed: 487 additions & 4 deletions

File tree

documentation/components/core/partitioning.md

Lines changed: 157 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,4 +107,160 @@ $dataFrame->partitionBy('id'); // If IDs are unique, creates many tiny partition
107107

108108
// Good partitioning - moderate cardinality
109109
$dataFrame->partitionBy('department'); // Assuming reasonable number of departments
110-
```
110+
```
111+
112+
## Save Modes with Partitioning
113+
114+
When writing partitioned data, the save mode determines how existing partition directories are handled.
115+
116+
### Overwrite Mode
117+
118+
```php
119+
<?php
120+
121+
use function Flow\ETL\DSL\{data_frame, from_array, overwrite, ref};
122+
use function Flow\ETL\Adapter\CSV\to_csv;
123+
124+
data_frame()
125+
->read(from_array([
126+
['date' => '2024-01-01', 'amount' => 100],
127+
['date' => '2024-01-02', 'amount' => 200],
128+
]))
129+
->partitionBy(ref('date'))
130+
->mode(overwrite())
131+
->write(to_csv(__DIR__ . '/output/sales.csv'))
132+
->run();
133+
```
134+
135+
**Behavior:**
136+
- Removes ALL files within partition directories being written to
137+
- Partitions NOT in the current dataset are preserved
138+
- Running twice with the same partition values replaces the first write completely
139+
140+
**Common pitfall:** If you write two separate DataFrames to the same partitions using `overwrite()`, the second write deletes data from the first:
141+
142+
```php
143+
<?php
144+
145+
// First write - creates date=2024-01-01/sales.csv with amount=100
146+
data_frame()
147+
->read(from_array([['date' => '2024-01-01', 'amount' => 100]]))
148+
->partitionBy(ref('date'))
149+
->mode(overwrite())
150+
->write(to_csv(__DIR__ . '/output/sales.csv'))
151+
->run();
152+
153+
// Second write - DELETES the 100 and writes 200
154+
data_frame()
155+
->read(from_array([['date' => '2024-01-01', 'amount' => 200]]))
156+
->partitionBy(ref('date'))
157+
->mode(overwrite())
158+
->write(to_csv(__DIR__ . '/output/sales.csv'))
159+
->run();
160+
161+
// Result: date=2024-01-01/sales.csv contains ONLY amount=200
162+
```
163+
164+
To combine data from multiple sources into the same partition, use `append()` mode or merge data before writing.
165+
166+
### Append Mode
167+
168+
```php
169+
<?php
170+
171+
data_frame()
172+
->read(from_array([['date' => '2024-01-01', 'amount' => 100]]))
173+
->partitionBy(ref('date'))
174+
->mode(append())
175+
->write(to_csv(__DIR__ . '/output/sales.csv'))
176+
->run();
177+
```
178+
179+
**Behavior:**
180+
- Creates new files with randomized suffixes in existing partition directories
181+
- Does not remove existing files
182+
- Multiple runs accumulate files (may cause duplicates)
183+
184+
### Ignore Mode
185+
186+
```php
187+
<?php
188+
189+
data_frame()
190+
->read(from_array([['date' => '2024-01-01', 'amount' => 100]]))
191+
->partitionBy(ref('date'))
192+
->mode(ignore())
193+
->write(to_csv(__DIR__ . '/output/sales.csv'))
194+
->run();
195+
```
196+
197+
**Behavior:**
198+
- Skips writing if partition directory already exists
199+
- No error thrown, silently continues
200+
201+
### Exception If Exists Mode (Default)
202+
203+
```php
204+
<?php
205+
206+
data_frame()
207+
->read(from_array([['date' => '2024-01-01', 'amount' => 100]]))
208+
->partitionBy(ref('date'))
209+
->write(to_csv(__DIR__ . '/output/sales.csv')) // Default mode
210+
->run();
211+
```
212+
213+
**Behavior:**
214+
- Throws `RuntimeException` if any partition path already exists
215+
- Safest option to prevent accidental overwrites
216+
217+
## Reading Partitioned Data
218+
219+
Read partitioned data using glob patterns to match partition directories:
220+
221+
```php
222+
<?php
223+
224+
use function Flow\ETL\Adapter\CSV\from_csv;
225+
use function Flow\ETL\DSL\{data_frame, to_output};
226+
227+
data_frame()
228+
->read(from_csv(__DIR__ . '/output/date=*/*.csv'))
229+
->write(to_output())
230+
->run();
231+
```
232+
233+
### Partition Pruning
234+
235+
Skip entire partitions without reading their contents using `filterPartitions()`:
236+
237+
```php
238+
<?php
239+
240+
data_frame()
241+
->read(from_csv(__DIR__ . '/output/date=*/department=*/*.csv'))
242+
->filterPartitions(ref('date')->greaterThanEqual(lit('2024-01-01')))
243+
->write(to_output())
244+
->run();
245+
```
246+
247+
Unlike `filter()` which reads all data then discards non-matching rows, `filterPartitions()` evaluates partition metadata first and only reads matching partitions - significantly improving performance for large datasets.
248+
249+
### Path Partitions
250+
251+
Extract partition metadata without reading file contents using `from_path_partitions()`:
252+
253+
```php
254+
<?php
255+
256+
use function Flow\ETL\DSL\{data_frame, from_path_partitions, to_output};
257+
258+
data_frame()
259+
->read(from_path_partitions(__DIR__ . '/output/date=*/department=*/*.csv'))
260+
->write(to_output())
261+
->run();
262+
263+
// Output includes 'path' and 'partitions' columns
264+
```
265+
266+
Useful for discovering available partitions or building file manifests before processing data.

src/core/etl/src/Flow/ETL/Filesystem/FilesystemStreams.php

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,14 @@ public function closeStreams(Path $path) : void
4141

4242
if ($this->saveMode === SaveMode::Overwrite) {
4343
if ($fileStream->path()->partitions()->count()) {
44-
$partitionFilesPatter = \Flow\Filesystem\DSL\path($fileStream->path()->parentDirectory()->uri() . '/*', $fileStream->path()->options());
44+
$filename = \str_replace(self::FLOW_TMP_FILE_PREFIX, '', $fileStream->path()->filename());
4545

46-
foreach ($fs->list($partitionFilesPatter) as $partitionFile) {
46+
$partitionFilesPattern = \Flow\Filesystem\DSL\path(
47+
$fileStream->path()->parentDirectory()->uri() . '/' . $filename . '*.' . $fileStream->path()->extension(),
48+
$fileStream->path()->options()
49+
);
50+
51+
foreach ($fs->list($partitionFilesPattern) as $partitionFile) {
4752
if (\str_contains($partitionFile->path->path(), self::FLOW_TMP_FILE_PREFIX)) {
4853
continue;
4954
}
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Flow\ETL\Tests\Integration\Filesystem\FilesystemStreams\Partitioned;
6+
7+
use function Flow\ETL\DSL\overwrite;
8+
use function Flow\Filesystem\DSL\path;
9+
use Flow\ETL\Filesystem\FilesystemStreams;
10+
use Flow\ETL\Tests\Integration\Filesystem\FilesystemStreams\FilesystemStreamsTestCase;
11+
use Flow\Filesystem\Partition;
12+
13+
final class OverwriteMultipleFilesTest extends FilesystemStreamsTestCase
14+
{
15+
#[\Override]
16+
protected function tearDown() : void
17+
{
18+
parent::tearDown();
19+
$this->cleanFiles();
20+
}
21+
22+
public function test_multiple_writes_to_same_partition_with_different_filenames() : void
23+
{
24+
$this->setupFiles([
25+
__FUNCTION__ => [],
26+
]);
27+
28+
$salesStreams = $this->streams();
29+
$salesFile = $this->getPath(__FUNCTION__ . '/sales.csv');
30+
$salesStream = $salesStreams->writeTo($salesFile, partitions: [new Partition('partition', 'value')]);
31+
$salesStream->append('sales data');
32+
$salesStreams->closeStreams($salesFile);
33+
34+
$ordersStreams = $this->streams();
35+
$ordersFile = $this->getPath(__FUNCTION__ . '/orders.csv');
36+
$ordersStream = $ordersStreams->writeTo($ordersFile, partitions: [new Partition('partition', 'value')]);
37+
$ordersStream->append('orders data');
38+
$ordersStreams->closeStreams($ordersFile);
39+
40+
$files = \iterator_to_array($this->fs()->list(path($this->filesDirectory() . '/' . __FUNCTION__ . '/partition=value/*')));
41+
42+
self::assertCount(2, $files);
43+
44+
$basenames = \array_map(static fn ($file) => $file->path->basename(), $files);
45+
\sort($basenames);
46+
47+
self::assertSame(['orders.csv', 'sales.csv'], $basenames);
48+
49+
$contentByBasename = [];
50+
51+
foreach ($files as $file) {
52+
$contentByBasename[$file->path->basename()] = \file_get_contents($file->path->path());
53+
}
54+
55+
self::assertSame('sales data', $contentByBasename['sales.csv']);
56+
self::assertSame('orders data', $contentByBasename['orders.csv']);
57+
}
58+
59+
public function test_overwrite_cleans_up_randomized_files_with_same_basename() : void
60+
{
61+
$streams = $this->streams();
62+
63+
$this->setupFiles([
64+
__FUNCTION__ => [
65+
'partition=value' => [
66+
'file_abc123.csv' => 'randomized file content',
67+
'file.csv' => 'original file content',
68+
],
69+
],
70+
]);
71+
72+
$file = $this->getPath(__FUNCTION__ . '/file.csv');
73+
$fileStream = $streams->writeTo($file, partitions: [new Partition('partition', 'value')]);
74+
$fileStream->append('overwritten content');
75+
$streams->closeStreams($file);
76+
77+
$files = \iterator_to_array($this->fs()->list(path($this->filesDirectory() . '/' . __FUNCTION__ . '/partition=value/*')));
78+
79+
self::assertCount(1, $files);
80+
self::assertSame('file.csv', $files[0]->path->basename());
81+
self::assertSame('overwritten content', \file_get_contents($files[0]->path->path()));
82+
}
83+
84+
public function test_overwrite_does_not_delete_files_with_different_basename() : void
85+
{
86+
$streams = $this->streams();
87+
88+
$this->setupFiles([
89+
__FUNCTION__ => [
90+
'partition=value' => [
91+
'sales.csv' => 'sales data',
92+
],
93+
],
94+
]);
95+
96+
$ordersFile = $this->getPath(__FUNCTION__ . '/orders.csv');
97+
$ordersStream = $streams->writeTo($ordersFile, partitions: [new Partition('partition', 'value')]);
98+
$ordersStream->append('orders data');
99+
$streams->closeStreams($ordersFile);
100+
101+
$files = \iterator_to_array($this->fs()->list(path($this->filesDirectory() . '/' . __FUNCTION__ . '/partition=value/*')));
102+
103+
self::assertCount(2, $files);
104+
105+
$basenames = \array_map(static fn ($file) => $file->path->basename(), $files);
106+
\sort($basenames);
107+
108+
self::assertSame(['orders.csv', 'sales.csv'], $basenames);
109+
}
110+
111+
public function test_overwrite_replaces_file_with_same_basename() : void
112+
{
113+
$streams = $this->streams();
114+
115+
$this->setupFiles([
116+
__FUNCTION__ => [
117+
'partition=value' => [
118+
'file.csv' => 'old content',
119+
],
120+
],
121+
]);
122+
123+
$file = $this->getPath(__FUNCTION__ . '/file.csv');
124+
$fileStream = $streams->writeTo($file, partitions: [new Partition('partition', 'value')]);
125+
$fileStream->append('new content');
126+
$streams->closeStreams($file);
127+
128+
$files = \iterator_to_array($this->fs()->list(path($this->filesDirectory() . '/' . __FUNCTION__ . '/partition=value/*')));
129+
130+
self::assertCount(1, $files);
131+
self::assertSame('file.csv', $files[0]->path->basename());
132+
self::assertSame('new content', \file_get_contents($files[0]->path->path()));
133+
}
134+
135+
protected function streams() : FilesystemStreams
136+
{
137+
$streams = new FilesystemStreams($this->fstab());
138+
$streams->setMode(overwrite());
139+
140+
return $streams;
141+
}
142+
}

web/landing/.php-version

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
8.3
1+
8.3.30
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Skip entire partitions without reading their data using filterPartitions(). Unlike filter() which reads all data then filters, partition pruning evaluates metadata first and only reads matching partitions - dramatically improving performance for large datasets.
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
priority: 2
2+
hidden: false
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
use function Flow\ETL\Adapter\CSV\to_csv;
6+
use function Flow\ETL\DSL\{append, data_frame, from_array, ref};
7+
8+
require __DIR__ . '/vendor/autoload.php';
9+
10+
data_frame()
11+
->read(from_array(
12+
[
13+
['id' => 1, 'color' => 'red', 'sku' => 'PRODUCT01'],
14+
['id' => 2, 'color' => 'red', 'sku' => 'PRODUCT02'],
15+
['id' => 3, 'color' => 'red', 'sku' => 'PRODUCT03'],
16+
['id' => 4, 'color' => 'green', 'sku' => 'PRODUCT01'],
17+
['id' => 5, 'color' => 'green', 'sku' => 'PRODUCT02'],
18+
['id' => 6, 'color' => 'green', 'sku' => 'PRODUCT03'],
19+
['id' => 7, 'color' => 'blue', 'sku' => 'PRODUCT01'],
20+
['id' => 8, 'color' => 'blue', 'sku' => 'PRODUCT02'],
21+
]
22+
))
23+
->partitionBy(ref('color'), ref('sku'))
24+
->mode(append())
25+
->write(to_csv(__DIR__ . '/output/products.csv'))
26+
->run();
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Write partitioned data with append mode. New files with randomized suffixes are created in partition directories without removing existing data. Useful for incremental updates.
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
- [Partitioning](/documentation/components/core/partitioning)
2+
- [Save Mode](/documentation/components/core/save-mode)
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
- [Partitioning](/documentation/components/core/partitioning)
2+
- [Save Mode](/documentation/components/core/save-mode)

0 commit comments

Comments
 (0)