Skip to content

Commit d765945

Browse files
authored
Merge pull request #852 from utopia-php/csv-import-upsert-v2
Add skipDuplicates() scope guard to createDocuments
2 parents dd009a5 + fbe5117 commit d765945

10 files changed

Lines changed: 738 additions & 36 deletions

File tree

src/Database/Adapter.php

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ abstract class Adapter
3333

3434
protected bool $alterLocks = false;
3535

36+
protected bool $skipDuplicates = false;
37+
3638
/**
3739
* @var array<string, mixed>
3840
*/
@@ -392,6 +394,27 @@ public function inTransaction(): bool
392394
return $this->inTransaction > 0;
393395
}
394396

397+
/**
398+
* Run a callback with skipDuplicates enabled.
399+
* Duplicate key errors during createDocuments() will be silently skipped
400+
* instead of thrown. Nestable — saves and restores previous state.
401+
*
402+
* @template T
403+
* @param callable(): T $callback
404+
* @return T
405+
*/
406+
public function skipDuplicates(callable $callback): mixed
407+
{
408+
$previous = $this->skipDuplicates;
409+
$this->skipDuplicates = true;
410+
411+
try {
412+
return $callback();
413+
} finally {
414+
$this->skipDuplicates = $previous;
415+
}
416+
}
417+
395418
/**
396419
* @template T
397420
* @param callable(): T $callback

src/Database/Adapter/Mongo.php

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,11 @@ public function withTransaction(callable $callback): mixed
122122
return $callback();
123123
}
124124

125+
// upsert + $setOnInsert hits WriteConflict (E112) under txn snapshot isolation.
126+
if ($this->skipDuplicates) {
127+
return $callback();
128+
}
129+
125130
try {
126131
$this->startTransaction();
127132
$result = $callback();
@@ -1492,6 +1497,42 @@ public function createDocuments(Document $collection, array $documents): array
14921497
$records[] = $record;
14931498
}
14941499

1500+
// insertMany aborts the txn on any duplicate; upsert + $setOnInsert no-ops instead.
1501+
if ($this->skipDuplicates) {
1502+
if (empty($records)) {
1503+
return [];
1504+
}
1505+
1506+
$operations = [];
1507+
foreach ($records as $record) {
1508+
$filter = ['_uid' => $record['_uid'] ?? ''];
1509+
if ($this->sharedTables) {
1510+
$filter['_tenant'] = $record['_tenant'] ?? $this->getTenant();
1511+
}
1512+
1513+
// Filter fields can't reappear in $setOnInsert (mongo path-conflict error).
1514+
$setOnInsert = $record;
1515+
unset($setOnInsert['_uid'], $setOnInsert['_tenant']);
1516+
1517+
if (empty($setOnInsert)) {
1518+
continue;
1519+
}
1520+
1521+
$operations[] = [
1522+
'filter' => $filter,
1523+
'update' => ['$setOnInsert' => $setOnInsert],
1524+
];
1525+
}
1526+
1527+
try {
1528+
$this->client->upsert($name, $operations, $options);
1529+
} catch (MongoException $e) {
1530+
throw $this->processException($e);
1531+
}
1532+
1533+
return $documents;
1534+
}
1535+
14951536
try {
14961537
$documents = $this->client->insertMany($name, $records, $options);
14971538
} catch (MongoException $e) {

src/Database/Adapter/Pool.php

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ public function __construct(UtopiaPool $pool)
4343
public function delegate(string $method, array $args): mixed
4444
{
4545
if ($this->pinnedAdapter !== null) {
46+
if ($this->skipDuplicates) {
47+
return $this->pinnedAdapter->skipDuplicates(
48+
fn () => $this->pinnedAdapter->{$method}(...$args)
49+
);
50+
}
4651
return $this->pinnedAdapter->{$method}(...$args);
4752
}
4853

@@ -66,6 +71,11 @@ public function delegate(string $method, array $args): mixed
6671
$adapter->setMetadata($key, $value);
6772
}
6873

74+
if ($this->skipDuplicates) {
75+
return $adapter->skipDuplicates(
76+
fn () => $adapter->{$method}(...$args)
77+
);
78+
}
6979
return $adapter->{$method}(...$args);
7080
});
7181
}
@@ -146,6 +156,11 @@ public function withTransaction(callable $callback): mixed
146156

147157
$this->pinnedAdapter = $adapter;
148158
try {
159+
if ($this->skipDuplicates) {
160+
return $adapter->skipDuplicates(
161+
fn () => $adapter->withTransaction($callback)
162+
);
163+
}
149164
return $adapter->withTransaction($callback);
150165
} finally {
151166
$this->pinnedAdapter = null;

src/Database/Adapter/Postgres.php

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2350,6 +2350,35 @@ public function getSupportForOptionalSpatialAttributeWithExistingRows(): bool
23502350
return false;
23512351
}
23522352

2353+
protected function getInsertKeyword(): string
2354+
{
2355+
return 'INSERT INTO';
2356+
}
2357+
2358+
protected function getInsertSuffix(string $table): string
2359+
{
2360+
if (!$this->skipDuplicates) {
2361+
return '';
2362+
}
2363+
2364+
$conflictTarget = $this->sharedTables ? '("_uid", "_tenant")' : '("_uid")';
2365+
2366+
return "ON CONFLICT {$conflictTarget} DO NOTHING";
2367+
}
2368+
2369+
protected function getInsertPermissionsSuffix(): string
2370+
{
2371+
if (!$this->skipDuplicates) {
2372+
return '';
2373+
}
2374+
2375+
$conflictTarget = $this->sharedTables
2376+
? '("_type", "_permission", "_document", "_tenant")'
2377+
: '("_type", "_permission", "_document")';
2378+
2379+
return "ON CONFLICT {$conflictTarget} DO NOTHING";
2380+
}
2381+
23532382
public function decodePoint(string $wkb): array
23542383
{
23552384
if (str_starts_with(strtoupper($wkb), 'POINT(')) {

src/Database/Adapter/SQL.php

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1029,6 +1029,33 @@ public function getSupportForHostname(): bool
10291029
return true;
10301030
}
10311031

1032+
/**
1033+
* Returns the INSERT keyword, optionally with IGNORE for duplicate handling.
1034+
* Override in adapter subclasses for DB-specific syntax.
1035+
*/
1036+
protected function getInsertKeyword(): string
1037+
{
1038+
return $this->skipDuplicates ? 'INSERT IGNORE INTO' : 'INSERT INTO';
1039+
}
1040+
1041+
/**
1042+
* Returns a suffix appended after VALUES clause for duplicate handling.
1043+
* Override in adapter subclasses (e.g., Postgres uses ON CONFLICT DO NOTHING).
1044+
*/
1045+
protected function getInsertSuffix(string $table): string
1046+
{
1047+
return '';
1048+
}
1049+
1050+
/**
1051+
* Returns a suffix for the permissions INSERT statement when ignoring duplicates.
1052+
* Override in adapter subclasses for DB-specific syntax.
1053+
*/
1054+
protected function getInsertPermissionsSuffix(): string
1055+
{
1056+
return '';
1057+
}
1058+
10321059
/**
10331060
* Get current attribute count from collection document
10341061
*
@@ -2476,6 +2503,7 @@ public function createDocuments(Document $collection, array $documents): array
24762503
if (empty($documents)) {
24772504
return $documents;
24782505
}
2506+
24792507
$spatialAttributes = $this->getSpatialAttributes($collection);
24802508
$collection = $collection->getId();
24812509
try {
@@ -2573,8 +2601,9 @@ public function createDocuments(Document $collection, array $documents): array
25732601
$batchKeys = \implode(', ', $batchKeys);
25742602

25752603
$stmt = $this->getPDO()->prepare("
2576-
INSERT INTO {$this->getSQLTable($name)} {$columns}
2604+
{$this->getInsertKeyword()} {$this->getSQLTable($name)} {$columns}
25772605
VALUES {$batchKeys}
2606+
{$this->getInsertSuffix($name)}
25782607
");
25792608

25802609
foreach ($bindValues as $key => $value) {
@@ -2588,8 +2617,9 @@ public function createDocuments(Document $collection, array $documents): array
25882617
$permissions = \implode(', ', $permissions);
25892618

25902619
$sqlPermissions = "
2591-
INSERT INTO {$this->getSQLTable($name . '_perms')} (_type, _permission, _document {$tenantColumn})
2592-
VALUES {$permissions};
2620+
{$this->getInsertKeyword()} {$this->getSQLTable($name . '_perms')} (_type, _permission, _document {$tenantColumn})
2621+
VALUES {$permissions}
2622+
{$this->getInsertPermissionsSuffix()}
25932623
";
25942624

25952625
$stmtPermissions = $this->getPDO()->prepare($sqlPermissions);

src/Database/Adapter/SQLite.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1936,4 +1936,9 @@ public function getSupportForTTLIndexes(): bool
19361936
{
19371937
return false;
19381938
}
1939+
1940+
protected function getInsertKeyword(): string
1941+
{
1942+
return $this->skipDuplicates ? 'INSERT OR IGNORE INTO' : 'INSERT INTO';
1943+
}
19391944
}

src/Database/Database.php

Lines changed: 81 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,8 @@ class Database
417417

418418
protected bool $preserveDates = false;
419419

420+
protected bool $skipDuplicates = false;
421+
420422
protected bool $preserveSequence = false;
421423

422424
protected int $maxQueryValues = 5000;
@@ -842,6 +844,29 @@ public function skipRelationshipsExistCheck(callable $callback): mixed
842844
}
843845
}
844846

847+
public function skipDuplicates(callable $callback): mixed
848+
{
849+
$previous = $this->skipDuplicates;
850+
$this->skipDuplicates = true;
851+
852+
try {
853+
return $callback();
854+
} finally {
855+
$this->skipDuplicates = $previous;
856+
}
857+
}
858+
859+
/**
860+
* Build a tenant-aware identity key for a document.
861+
* Returns "<tenant>:<id>" in tenant-per-document shared-table mode, otherwise just the id.
862+
*/
863+
private function tenantKey(Document $document): string
864+
{
865+
return ($this->adapter->getSharedTables() && $this->adapter->getTenantPerDocument())
866+
? $document->getTenant() . ':' . $document->getId()
867+
: $document->getId();
868+
}
869+
845870
/**
846871
* Trigger callback for events
847872
*
@@ -5700,9 +5725,11 @@ public function createDocuments(
57005725
}
57015726

57025727
foreach (\array_chunk($documents, $batchSize) as $chunk) {
5703-
$batch = $this->withTransaction(function () use ($collection, $chunk) {
5704-
return $this->adapter->createDocuments($collection, $chunk);
5705-
});
5728+
$insert = fn () => $this->withTransaction(fn () => $this->adapter->createDocuments($collection, $chunk));
5729+
// Set adapter flag before withTransaction so Mongo can opt out of a real txn.
5730+
$batch = $this->skipDuplicates
5731+
? $this->adapter->skipDuplicates($insert)
5732+
: $insert();
57065733

57075734
$batch = $this->adapter->getSequences($collection->getId(), $batch);
57085735

@@ -7116,18 +7143,57 @@ public function upsertDocumentsWithIncrease(
71167143
$created = 0;
71177144
$updated = 0;
71187145
$seenIds = [];
7119-
foreach ($documents as $key => $document) {
7120-
if ($this->getSharedTables() && $this->getTenantPerDocument()) {
7121-
$old = $this->authorization->skip(fn () => $this->withTenant($document->getTenant(), fn () => $this->silent(fn () => $this->getDocument(
7122-
$collection->getId(),
7123-
$document->getId(),
7124-
))));
7125-
} else {
7126-
$old = $this->authorization->skip(fn () => $this->silent(fn () => $this->getDocument(
7127-
$collection->getId(),
7128-
$document->getId(),
7129-
)));
7146+
7147+
// Batch-fetch existing documents in one query instead of N individual getDocument() calls.
7148+
// tenantPerDocument: group ids by tenant and run one find() per tenant under withTenant,
7149+
// so cross-tenant batches (e.g. StatsUsage worker) don't get silently scoped to the
7150+
// session tenant and miss rows belonging to other tenants.
7151+
$existingDocs = [];
7152+
7153+
if ($this->getSharedTables() && $this->getTenantPerDocument()) {
7154+
$idsByTenant = [];
7155+
foreach ($documents as $doc) {
7156+
if ($doc->getId() !== '') {
7157+
$idsByTenant[$doc->getTenant()][] = $doc->getId();
7158+
}
7159+
}
7160+
foreach ($idsByTenant as $tenant => $tenantIds) {
7161+
$tenantIds = \array_values(\array_unique($tenantIds));
7162+
foreach (\array_chunk($tenantIds, \max(1, $this->maxQueryValues)) as $chunk) {
7163+
$found = $this->authorization->skip(fn () => $this->withTenant($tenant, fn () => $this->silent(
7164+
fn () => $this->find($collection->getId(), [
7165+
Query::equal('$id', $chunk),
7166+
Query::limit($this->maxQueryValues),
7167+
])
7168+
)));
7169+
foreach ($found as $doc) {
7170+
$existingDocs[$this->tenantKey($doc)] = $doc;
7171+
}
7172+
}
71307173
}
7174+
} else {
7175+
$docIds = \array_values(\array_unique(\array_filter(
7176+
\array_map(fn (Document $doc) => $doc->getId(), $documents),
7177+
fn ($id) => $id !== ''
7178+
)));
7179+
7180+
if (!empty($docIds)) {
7181+
foreach (\array_chunk($docIds, \max(1, $this->maxQueryValues)) as $chunk) {
7182+
$existing = $this->authorization->skip(fn () => $this->silent(
7183+
fn () => $this->find($collection->getId(), [
7184+
Query::equal('$id', $chunk),
7185+
Query::limit($this->maxQueryValues),
7186+
])
7187+
));
7188+
foreach ($existing as $doc) {
7189+
$existingDocs[$this->tenantKey($doc)] = $doc;
7190+
}
7191+
}
7192+
}
7193+
}
7194+
7195+
foreach ($documents as $key => $document) {
7196+
$old = $existingDocs[$this->tenantKey($document)] ?? new Document();
71317197

71327198
// Extract operators early to avoid comparison issues
71337199
$documentArray = $document->getArrayCopy();
@@ -7294,7 +7360,7 @@ public function upsertDocumentsWithIncrease(
72947360
$document = $this->silent(fn () => $this->createDocumentRelationships($collection, $document));
72957361
}
72967362

7297-
$seenIds[] = $document->getId();
7363+
$seenIds[] = $this->tenantKey($document);
72987364
$old = $this->adapter->castingBefore($collection, $old);
72997365
$document = $this->adapter->castingBefore($collection, $document);
73007366

0 commit comments

Comments
 (0)