Skip to content

Commit 545705e

Browse files
authored
Merge pull request #72 from utopia-php/feat-csv
Feat CSV source
2 parents 84163e1 + f321d08 commit 545705e

1 file changed

Lines changed: 280 additions & 0 deletions

File tree

src/Migration/Sources/CSV.php

Lines changed: 280 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,280 @@
1+
<?php
2+
3+
namespace Utopia\Migration\Sources;
4+
5+
use Utopia\Database\Database as UtopiaDatabase;
6+
use Utopia\Migration\Exception;
7+
use Utopia\Migration\Resource;
8+
use Utopia\Migration\Resources\Database\Attribute;
9+
use Utopia\Migration\Resources\Database\Collection;
10+
use Utopia\Migration\Resources\Database\Database;
11+
use Utopia\Migration\Resources\Database\Document;
12+
use Utopia\Migration\Resources\Storage\File;
13+
use Utopia\Migration\Source;
14+
use Utopia\Migration\Sources\Appwrite\Reader;
15+
use Utopia\Migration\Sources\Appwrite\Reader\Database as DatabaseReader;
16+
use Utopia\Migration\Transfer;
17+
use Utopia\Storage\Device;
18+
19+
class CSV extends Source
20+
{
21+
private string $filePath;
22+
23+
/**
24+
* format: `{databaseId:collectionId}`
25+
*/
26+
private string $resourceId;
27+
28+
private Device $device;
29+
30+
private Reader $database;
31+
32+
public function __construct(
33+
string $resourceId,
34+
string $filePath,
35+
Device $device,
36+
?UtopiaDatabase $dbForProject
37+
) {
38+
$this->device = $device;
39+
$this->filePath = $filePath;
40+
$this->resourceId = $resourceId;
41+
$this->database = new DatabaseReader($dbForProject);
42+
}
43+
44+
public static function getName(): string
45+
{
46+
return 'CSV';
47+
}
48+
49+
public static function getSupportedResources(): array
50+
{
51+
return [
52+
Resource::TYPE_DOCUMENT,
53+
];
54+
}
55+
56+
// called before the `exportGroupDatabases`.
57+
public function report(array $resources = []): array
58+
{
59+
$report = [];
60+
61+
if (! $this->device->exists($this->filePath)) {
62+
return $report;
63+
}
64+
65+
$file = new \SplFileObject($this->filePath, 'r');
66+
$file->setFlags(\SplFileObject::READ_CSV | \SplFileObject::SKIP_EMPTY);
67+
68+
$file->seek(PHP_INT_MAX);
69+
$rowCount = max(0, $file->key());
70+
$rowCount = $rowCount > 0 ? $rowCount - 1 : 0;
71+
72+
$report[Resource::TYPE_DOCUMENT] = $rowCount;
73+
74+
return $report;
75+
}
76+
77+
/**
78+
* @throws \Exception
79+
*/
80+
protected function exportGroupAuth(int $batchSize, array $resources): void
81+
{
82+
throw new \Exception('Not Implemented');
83+
}
84+
85+
protected function exportGroupDatabases(int $batchSize, array $resources): void
86+
{
87+
try {
88+
if (\in_array(Resource::TYPE_DOCUMENT, $resources)) {
89+
$this->exportDocuments($batchSize);
90+
}
91+
} catch (\Throwable $e) {
92+
$this->addError(
93+
new Exception(
94+
Resource::TYPE_DOCUMENT,
95+
Transfer::GROUP_DATABASES,
96+
message: $e->getMessage(),
97+
code: $e->getCode(),
98+
previous: $e
99+
)
100+
);
101+
} finally {
102+
// delete the temporary file!
103+
$this->device->delete($this->filePath);
104+
}
105+
}
106+
107+
/**
108+
* @throws \Exception
109+
*/
110+
private function exportDocuments(int $batchSize): void
111+
{
112+
113+
$attributes = [];
114+
$lastAttribute = null;
115+
116+
[$databaseId, $collectionId] = explode(':', $this->resourceId);
117+
$database = new Database($databaseId, '');
118+
$collection = new Collection($database, '', $collectionId);
119+
120+
while (true) {
121+
$queries = [$this->database->queryLimit($batchSize)];
122+
if ($lastAttribute) {
123+
$queries[] = $this->database->queryCursorAfter($lastAttribute);
124+
}
125+
126+
$fetched = $this->database->listAttributes($collection, $queries);
127+
if (empty($fetched)) {
128+
break;
129+
}
130+
131+
array_push($attributes, ...$fetched);
132+
$lastAttribute = $fetched[count($fetched) - 1];
133+
134+
if (count($fetched) < $batchSize) {
135+
break;
136+
}
137+
}
138+
139+
$attributeTypes = [];
140+
$manyToManyKeys = [];
141+
142+
foreach ($attributes as $attribute) {
143+
$key = $attribute['key'];
144+
145+
if (
146+
$attribute['type'] === Attribute::TYPE_RELATIONSHIP &&
147+
($attribute['side'] ?? '') === UtopiaDatabase::RELATION_SIDE_CHILD
148+
) {
149+
continue;
150+
}
151+
152+
$attributeTypes[$key] = $attribute['type'];
153+
154+
if (
155+
$attribute['type'] === Attribute::TYPE_RELATIONSHIP &&
156+
($attribute['relationType'] ?? '') === 'manyToMany' &&
157+
($attribute['side'] ?? '') === 'parent'
158+
) {
159+
$manyToManyKeys[] = $key;
160+
}
161+
}
162+
163+
$this->withCSVStream(function ($stream) use ($attributeTypes, $manyToManyKeys, $collection, $batchSize) {
164+
$headers = fgetcsv($stream);
165+
if (! is_array($headers) || count($headers) === 0) {
166+
return;
167+
}
168+
169+
$buffer = [];
170+
171+
while (($row = fgetcsv($stream)) !== false) {
172+
$data = array_combine($headers, $row);
173+
if ($data === false) {
174+
continue;
175+
}
176+
177+
$parsedData = $data;
178+
179+
foreach ($data as $key => $value) {
180+
$parsedValue = trim($value);
181+
$type = $attributeTypes[$key] ?? null;
182+
183+
if (! isset($type) || $parsedValue === '') {
184+
continue;
185+
}
186+
187+
if (in_array($key, $manyToManyKeys, true)) {
188+
$parsedData[$key] = str_contains($parsedValue, ',')
189+
? array_map('trim', explode(',', $parsedValue))
190+
: [$parsedValue];
191+
192+
continue;
193+
}
194+
195+
$parsedData[$key] = match ($type) {
196+
Attribute::TYPE_INTEGER => is_numeric($parsedValue) ? (int) $parsedValue : null,
197+
Attribute::TYPE_FLOAT => is_numeric($parsedValue) ? (float) $parsedValue : null,
198+
Attribute::TYPE_BOOLEAN => filter_var($parsedValue, FILTER_VALIDATE_BOOLEAN),
199+
default => $parsedValue,
200+
};
201+
}
202+
203+
$documentId = $parsedData['$id'] ?? 'unique()';
204+
205+
// `$id`, `$permissions` in the doc can cause issues!
206+
unset($parsedData['$id'], $parsedData['$permissions']);
207+
208+
$document = new Document($documentId, $collection, $parsedData);
209+
$buffer[] = $document;
210+
211+
if (count($buffer) === $batchSize) {
212+
$this->callback($buffer);
213+
$buffer = [];
214+
}
215+
}
216+
217+
if (! empty($buffer)) {
218+
$this->callback($buffer);
219+
}
220+
});
221+
}
222+
223+
/**
224+
* @throws \Exception
225+
*/
226+
protected function exportGroupStorage(int $batchSize, array $resources): void
227+
{
228+
throw new \Exception('Not Implemented');
229+
}
230+
231+
/**
232+
* @throws \Exception
233+
*/
234+
protected function exportBuckets(int $batchSize): void
235+
{
236+
throw new \Exception('Not Implemented');
237+
}
238+
239+
/**
240+
* @throws \Exception
241+
*/
242+
private function exportFiles(int $batchSize): void
243+
{
244+
throw new \Exception('Not Implemented');
245+
}
246+
247+
/**
248+
* @throws \Exception
249+
*/
250+
private function exportFile(File $file): void
251+
{
252+
throw new \Exception('Not Implemented');
253+
}
254+
255+
/**
256+
* @throws \Exception
257+
*/
258+
protected function exportGroupFunctions(int $batchSize, array $resources): void
259+
{
260+
throw new \Exception('Not Implemented');
261+
}
262+
263+
private function withCsvStream(callable $fn): void
264+
{
265+
if (! $this->device->exists($this->filePath)) {
266+
return;
267+
}
268+
269+
$stream = fopen($this->filePath, 'r');
270+
if (! $stream) {
271+
return;
272+
}
273+
274+
try {
275+
$fn($stream);
276+
} finally {
277+
fclose($stream);
278+
}
279+
}
280+
}

0 commit comments

Comments
 (0)