Skip to content

Commit 6158974

Browse files
authored
Partitioning should not overwrite schema (#1752)
1 parent e383580 commit 6158974

3 files changed

Lines changed: 236 additions & 7 deletions

File tree

src/core/etl/src/Flow/ETL/Filesystem/FilesystemStreams.php

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,15 @@ final class FilesystemStreams implements \Countable, \IteratorAggregate
1616
{
1717
public const FLOW_TMP_FILE_PREFIX = '._flow_php_tmp.';
1818

19-
private SaveMode $saveMode;
19+
private SaveMode $saveMode = SaveMode::ExceptionIfExists;
2020

2121
/**
2222
* @var array<string, array<string, DestinationStream>>
2323
*/
24-
private array $writingStreams;
24+
private array $writingStreams = [];
2525

2626
public function __construct(private readonly FilesystemTable $fstab)
2727
{
28-
$this->writingStreams = [];
29-
$this->saveMode = SaveMode::ExceptionIfExists;
3028
}
3129

3230
public function closeStreams(Path $path) : void
@@ -38,14 +36,13 @@ public function closeStreams(Path $path) : void
3836
foreach ($this->writingStreams as $nextBasePath => $nextStreams) {
3937
if ($path->uri() === $nextBasePath) {
4038
foreach ($nextStreams as $fileStream) {
41-
4239
if ($fileStream->isOpen()) {
4340
$fileStream->close();
4441
}
4542

4643
if ($this->saveMode === SaveMode::Overwrite) {
4744
if ($fileStream->path()->partitions()->count()) {
48-
$partitionFilesPatter = new Path($fileStream->path()->parentDirectory()->path() . '/*', $fileStream->path()->options());
45+
$partitionFilesPatter = new Path($fileStream->path()->parentDirectory()->uri() . '/*', $fileStream->path()->options());
4946

5047
foreach ($fs->list($partitionFilesPatter) as $partitionFile) {
5148
if (\str_contains($partitionFile->path->path(), self::FLOW_TMP_FILE_PREFIX)) {
Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Flow\ETL\Tests\Double;
6+
7+
use Flow\Filesystem\{DestinationStream, FileStatus, Filesystem, Path, Protocol, SourceStream};
8+
use Flow\Filesystem\Exception\{InvalidArgumentException, RuntimeException};
9+
use Flow\Filesystem\Path\Filter;
10+
use Flow\Filesystem\Path\Filter\OnlyFiles;
11+
use Flow\Filesystem\Stream\{NativeLocalDestinationStream, NativeLocalSourceStream};
12+
use Webmozart\Glob\Glob;
13+
14+
final class FakeNativeLocalFilesystem implements Filesystem
15+
{
16+
public function appendTo(Path $path) : DestinationStream
17+
{
18+
if ($path->isEqual($this->getSystemTmpDir())) {
19+
throw new RuntimeException('Cannot write to system tmp directory');
20+
}
21+
22+
$this->protocol()->validateScheme($path);
23+
24+
if ($path->isPattern()) {
25+
throw new InvalidArgumentException("Pattern paths can't be written: " . $path->uri());
26+
}
27+
28+
if (!$this->status($path->parentDirectory())) {
29+
if (!\mkdir($concurrentDirectory = $path->parentDirectory()->path(), recursive: true) && !\is_dir($concurrentDirectory)) {
30+
throw new RuntimeException(\sprintf('Directory "%s" was not created', $concurrentDirectory));
31+
}
32+
}
33+
34+
return NativeLocalDestinationStream::openAppend($path);
35+
}
36+
37+
public function getSystemTmpDir() : Path
38+
{
39+
return new Path(\sys_get_temp_dir());
40+
}
41+
42+
public function list(Path $path, Filter $pathFilter = new OnlyFiles()) : \Generator
43+
{
44+
$this->protocol()->validateScheme($path);
45+
46+
if (!$path->isPattern()) {
47+
if ($pathFilter->accept($status = new FileStatus($path, \is_file($path->path())))) {
48+
yield $status;
49+
}
50+
51+
return;
52+
53+
}
54+
55+
foreach (Glob::glob($path->path()) as $filePath) {
56+
$status = new FileStatus(Path::realpath($filePath, $path->options()), \is_file($filePath));
57+
58+
if ($pathFilter->accept($status)) {
59+
yield $status;
60+
}
61+
}
62+
}
63+
64+
public function mv(Path $from, Path $to) : bool
65+
{
66+
$this->protocol()->validateScheme($from);
67+
$this->protocol()->validateScheme($to);
68+
69+
if (\file_exists($to->path())) {
70+
$this->rm($to);
71+
}
72+
73+
if (!\rename($from->path(), $to->path())) {
74+
return false;
75+
}
76+
77+
return true;
78+
}
79+
80+
public function protocol() : Protocol
81+
{
82+
return new Protocol('fake');
83+
}
84+
85+
public function readFrom(Path $path) : SourceStream
86+
{
87+
$this->protocol()->validateScheme($path);
88+
89+
if ($path->isPattern()) {
90+
throw new InvalidArgumentException("Pattern paths can't be open: " . $path->uri());
91+
}
92+
93+
if (!$this->status($path->parentDirectory())) {
94+
if (!\mkdir($concurrentDirectory = $path->parentDirectory()->path(), recursive: true) && !\is_dir($concurrentDirectory)) {
95+
throw new RuntimeException(\sprintf('Directory "%s" was not created', $concurrentDirectory));
96+
}
97+
}
98+
99+
return NativeLocalSourceStream::open($path);
100+
}
101+
102+
public function rm(Path $path) : bool
103+
{
104+
$this->protocol()->validateScheme($path);
105+
106+
if (!$path->isPattern()) {
107+
if (!\file_exists($path->path())) {
108+
return false;
109+
}
110+
111+
if (\is_dir($path->path())) {
112+
$this->rmdir($path->path());
113+
} else {
114+
\unlink($path->path());
115+
}
116+
117+
return true;
118+
}
119+
120+
$deletedCount = 0;
121+
122+
foreach (Glob::glob($path->path()) as $filePath) {
123+
if (\is_dir($filePath)) {
124+
$this->rmdir($filePath);
125+
} else {
126+
\unlink($filePath);
127+
}
128+
129+
$deletedCount++;
130+
}
131+
132+
return (bool) $deletedCount;
133+
}
134+
135+
public function status(Path $path) : ?FileStatus
136+
{
137+
$this->protocol()->validateScheme($path);
138+
139+
if (!$path->isPattern() && \file_exists($path->path())) {
140+
return new FileStatus(
141+
$path,
142+
\is_file($path->path())
143+
);
144+
}
145+
146+
foreach (Glob::glob($path->path()) as $filePath) {
147+
if (\file_exists($filePath)) {
148+
return new FileStatus(new Path($filePath, $path->options()), true);
149+
}
150+
}
151+
152+
return null;
153+
}
154+
155+
public function writeTo(Path $path) : DestinationStream
156+
{
157+
if ($path->isEqual($this->getSystemTmpDir())) {
158+
throw new RuntimeException('Cannot write to system tmp directory');
159+
}
160+
161+
$this->protocol()->validateScheme($path);
162+
163+
if ($path->isPattern()) {
164+
throw new InvalidArgumentException("Pattern paths can't be written: " . $path->uri());
165+
}
166+
167+
if (!$this->status($path->parentDirectory())) {
168+
if (!\mkdir($concurrentDirectory = $path->parentDirectory()->path(), recursive: true) && !\is_dir($concurrentDirectory)) {
169+
throw new RuntimeException(\sprintf('Directory "%s" was not created', $concurrentDirectory));
170+
}
171+
}
172+
173+
return NativeLocalDestinationStream::openBlank($path);
174+
}
175+
176+
private function rmdir(string $dirPath) : void
177+
{
178+
if (!\is_dir($dirPath)) {
179+
throw new InvalidArgumentException("{$dirPath} must be a directory");
180+
}
181+
182+
if (!\str_ends_with($dirPath, '/')) {
183+
$dirPath .= '/';
184+
}
185+
186+
$files = \scandir($dirPath);
187+
188+
if (!$files) {
189+
throw new RuntimeException("Can't read directory: {$dirPath}");
190+
}
191+
192+
foreach ($files as $file) {
193+
if (\in_array($file, ['.', '..'], true)) {
194+
continue;
195+
}
196+
197+
$filePath = $dirPath . $file;
198+
199+
if (\is_dir($filePath)) {
200+
$this->rmdir($filePath);
201+
} else {
202+
\unlink($filePath);
203+
}
204+
}
205+
206+
\rmdir($dirPath);
207+
}
208+
}

src/core/etl/tests/Flow/ETL/Tests/Integration/Filesystem/FilesystemStreams/Partitioned/OverwriteModeTest.php

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@
66

77
use function Flow\ETL\DSL\overwrite;
88
use Flow\ETL\Filesystem\{FilesystemStreams};
9+
use Flow\ETL\Tests\Double\FakeNativeLocalFilesystem;
910
use Flow\ETL\Tests\Integration\Filesystem\FilesystemStreams\FilesystemStreamsTestCase;
10-
use Flow\Filesystem\{Partition, Path};
11+
use Flow\Filesystem\{FilesystemTable, Partition, Path};
1112

1213
final class OverwriteModeTest extends FilesystemStreamsTestCase
1314
{
@@ -85,6 +86,29 @@ public function test_open_stream_for_non_existing_partition() : void
8586
self::assertSame('new content', \file_get_contents($files[0]->path->path()));
8687
}
8788

89+
public function test_open_stream_for_non_existing_partition_with_custom_schema() : void
90+
{
91+
$this->setupFiles([__FUNCTION__ => []]);
92+
93+
$fs = new FakeNativeLocalFilesystem();
94+
95+
$file = new Path($fs->protocol()->scheme() . $this->filesDirectory() . DIRECTORY_SEPARATOR . __FUNCTION__ . '/file.txt');
96+
97+
$streams = new FilesystemStreams(new FilesystemTable($fs));
98+
$streams->setSaveMode(overwrite());
99+
100+
$appendedFile = $streams->writeTo($file, partitions: [new Partition('partition', 'value')]);
101+
$appendedFile->append('new content');
102+
103+
$streams->closeStreams($file);
104+
$files = \iterator_to_array($this->fs()->list(new Path($file->parentDirectory()->path() . '/partition=value/*')));
105+
106+
self::assertCount(1, $files);
107+
108+
self::assertSame('file.txt', $files[0]->path->basename());
109+
self::assertSame('new content', \file_get_contents($files[0]->path->path()));
110+
}
111+
88112
protected function streams() : FilesystemStreams
89113
{
90114
$streams = new FilesystemStreams($this->fstab());

0 commit comments

Comments
 (0)