diff --git a/src/Database/Adapter.php b/src/Database/Adapter.php index a7b385cce..9cc83d141 100644 --- a/src/Database/Adapter.php +++ b/src/Database/Adapter.php @@ -729,12 +729,13 @@ abstract public function createDocument(Document $collection, Document $document * * @param Document $collection * @param array $documents + * @param bool $ignore If true, silently ignore duplicate documents instead of throwing * * @return array * * @throws DatabaseException */ - abstract public function createDocuments(Document $collection, array $documents): array; + abstract public function createDocuments(Document $collection, array $documents, bool $ignore = false): array; /** * Update Document diff --git a/src/Database/Adapter/Mongo.php b/src/Database/Adapter/Mongo.php index 7ddde43d3..6ecae691d 100644 --- a/src/Database/Adapter/Mongo.php +++ b/src/Database/Adapter/Mongo.php @@ -1460,11 +1460,18 @@ public function castingBefore(Document $collection, Document $document): Documen * @throws DuplicateException * @throws DatabaseException */ - public function createDocuments(Document $collection, array $documents): array + public function createDocuments(Document $collection, array $documents, bool $ignore = false): array { $name = $this->getNamespace() . '_' . $this->filter($collection->getId()); - $options = $this->getTransactionOptions(); + if ($ignore) { + // Run outside transaction — MongoDB aborts transactions on any write error, + // so ordered:false + session would roll back even successfully inserted docs. + $options = ['ordered' => false]; + } else { + $options = $this->getTransactionOptions(); + } + $records = []; $hasSequence = null; $documents = \array_map(fn ($doc) => clone $doc, $documents); @@ -1490,7 +1497,16 @@ public function createDocuments(Document $collection, array $documents): array try { $documents = $this->client->insertMany($name, $records, $options); } catch (MongoException $e) { - throw $this->processException($e); + $processed = $this->processException($e); + + if ($ignore && $processed instanceof DuplicateException) { + // Race condition: a doc was inserted between pre-filter and insertMany. + // With ordered:false outside transaction, non-duplicate inserts persist. + // Return empty — we cannot determine which docs succeeded without querying. + return []; + } + + throw $processed; } foreach ($documents as $index => $document) { diff --git a/src/Database/Adapter/Pool.php b/src/Database/Adapter/Pool.php index 668753387..ec508c3e4 100644 --- a/src/Database/Adapter/Pool.php +++ b/src/Database/Adapter/Pool.php @@ -268,7 +268,7 @@ public function createDocument(Document $collection, Document $document): Docume return $this->delegate(__FUNCTION__, \func_get_args()); } - public function createDocuments(Document $collection, array $documents): array + public function createDocuments(Document $collection, array $documents, bool $ignore = false): array { return $this->delegate(__FUNCTION__, \func_get_args()); } diff --git a/src/Database/Adapter/Postgres.php b/src/Database/Adapter/Postgres.php index 8dcf72025..814ecc8bc 100644 --- a/src/Database/Adapter/Postgres.php +++ b/src/Database/Adapter/Postgres.php @@ -1365,6 +1365,35 @@ public function updateDocument(Document $collection, string $id, Document $docum return $document; } + protected function getInsertKeyword(bool $ignore): string + { + return 'INSERT INTO'; + } + + protected function getInsertSuffix(bool $ignore, string $table): string + { + if (!$ignore) { + return ''; + } + + $conflictTarget = $this->sharedTables ? '("_uid", "_tenant")' : '("_uid")'; + + return "ON CONFLICT {$conflictTarget} DO NOTHING"; + } + + protected function getInsertPermissionsSuffix(bool $ignore): string + { + if (!$ignore) { + return ''; + } + + $conflictTarget = $this->sharedTables + ? '("_type", "_permission", "_document", "_tenant")' + : '("_type", "_permission", "_document")'; + + return "ON CONFLICT {$conflictTarget} DO NOTHING"; + } + /** * @param string $tableName * @param string $columns diff --git a/src/Database/Adapter/SQL.php b/src/Database/Adapter/SQL.php index 6864e6aee..dbe66ce12 100644 --- a/src/Database/Adapter/SQL.php +++ b/src/Database/Adapter/SQL.php @@ -2471,7 +2471,7 @@ protected function execute(mixed $stmt): bool * @throws DuplicateException * @throws \Throwable */ - public function createDocuments(Document $collection, array $documents): array + public function createDocuments(Document $collection, array $documents, bool $ignore = false): array { if (empty($documents)) { return $documents; @@ -2573,8 +2573,9 @@ public function createDocuments(Document $collection, array $documents): array $batchKeys = \implode(', ', $batchKeys); $stmt = $this->getPDO()->prepare(" - INSERT INTO {$this->getSQLTable($name)} {$columns} + {$this->getInsertKeyword($ignore)} {$this->getSQLTable($name)} {$columns} VALUES {$batchKeys} + {$this->getInsertSuffix($ignore, $name)} "); foreach ($bindValues as $key => $value) { @@ -2588,8 +2589,9 @@ public function createDocuments(Document $collection, array $documents): array $permissions = \implode(', ', $permissions); $sqlPermissions = " - INSERT INTO {$this->getSQLTable($name . '_perms')} (_type, _permission, _document {$tenantColumn}) - VALUES {$permissions}; + {$this->getInsertKeyword($ignore)} {$this->getSQLTable($name . '_perms')} (_type, _permission, _document {$tenantColumn}) + VALUES {$permissions} + {$this->getInsertPermissionsSuffix($ignore)} "; $stmtPermissions = $this->getPDO()->prepare($sqlPermissions); @@ -2608,6 +2610,33 @@ public function createDocuments(Document $collection, array $documents): array return $documents; } + /** + * Returns the INSERT keyword, optionally with IGNORE for duplicate handling. + * Override in adapter subclasses for DB-specific syntax. + */ + protected function getInsertKeyword(bool $ignore): string + { + return $ignore ? 'INSERT IGNORE INTO' : 'INSERT INTO'; + } + + /** + * Returns a suffix appended after VALUES clause for duplicate handling. + * Override in adapter subclasses (e.g., Postgres uses ON CONFLICT DO NOTHING). + */ + protected function getInsertSuffix(bool $ignore, string $table): string + { + return ''; + } + + /** + * Returns a suffix for the permissions INSERT statement when ignoring duplicates. + * Override in adapter subclasses for DB-specific syntax. + */ + protected function getInsertPermissionsSuffix(bool $ignore): string + { + return ''; + } + /** * @param Document $collection * @param string $attribute diff --git a/src/Database/Adapter/SQLite.php b/src/Database/Adapter/SQLite.php index 3c25987eb..8e7ef6b5b 100644 --- a/src/Database/Adapter/SQLite.php +++ b/src/Database/Adapter/SQLite.php @@ -34,6 +34,11 @@ */ class SQLite extends MariaDB { + protected function getInsertKeyword(bool $ignore): string + { + return $ignore ? 'INSERT OR IGNORE INTO' : 'INSERT INTO'; + } + /** * @inheritDoc */ diff --git a/src/Database/Database.php b/src/Database/Database.php index ac58d72f0..5e66334fb 100644 --- a/src/Database/Database.php +++ b/src/Database/Database.php @@ -5621,6 +5621,7 @@ public function createDocument(string $collection, Document $document): Document * @param int $batchSize * @param (callable(Document): void)|null $onNext * @param (callable(Throwable): void)|null $onError + * @param bool $ignore If true, silently ignore duplicate documents instead of throwing * @return int * @throws AuthorizationException * @throws StructureException @@ -5633,6 +5634,7 @@ public function createDocuments( int $batchSize = self::INSERT_BATCH_SIZE, ?callable $onNext = null, ?callable $onError = null, + bool $ignore = false, ): int { if (!$this->adapter->getSharedTables() && $this->adapter->getTenantPerDocument()) { throw new DatabaseException('Shared tables must be enabled if tenant per document is enabled.'); @@ -5653,6 +5655,71 @@ public function createDocuments( $time = DateTime::now(); $modified = 0; + // Deduplicate intra-batch documents by ID when ignore mode is on. + // Keeps the first occurrence, mirrors upsertDocuments' seenIds check. + if ($ignore) { + $seenIds = []; + $deduplicated = []; + foreach ($documents as $document) { + $docId = $document->getId(); + if ($docId !== '' && isset($seenIds[$docId])) { + continue; + } + if ($docId !== '') { + $seenIds[$docId] = true; + } + $deduplicated[] = $document; + } + $documents = $deduplicated; + } + + // When ignore mode is on and relationships are being resolved, + // pre-fetch existing document IDs so we skip relationship writes for duplicates + $preExistingIds = []; + $tenantPerDocument = $this->adapter->getSharedTables() && $this->adapter->getTenantPerDocument(); + if ($ignore) { + if ($tenantPerDocument) { + $idsByTenant = []; + foreach ($documents as $doc) { + $idsByTenant[$doc->getTenant()][] = $doc->getId(); + } + foreach ($idsByTenant as $tenant => $tenantIds) { + $tenantIds = \array_values(\array_unique($tenantIds)); + foreach (\array_chunk($tenantIds, \max(1, $this->maxQueryValues)) as $idChunk) { + $existing = $this->authorization->skip(fn () => $this->withTenant($tenant, fn () => $this->silent(fn () => $this->find( + $collection->getId(), + [ + Query::equal('$id', $idChunk), + Query::select(['$id']), + Query::limit(\count($idChunk)), + ] + )))); + foreach ($existing as $doc) { + $preExistingIds[$tenant . ':' . $doc->getId()] = true; + } + } + } + } else { + $inputIds = \array_values(\array_unique(\array_filter( + \array_map(fn (Document $doc) => $doc->getId(), $documents) + ))); + + foreach (\array_chunk($inputIds, \max(1, $this->maxQueryValues)) as $idChunk) { + $existing = $this->authorization->skip(fn () => $this->silent(fn () => $this->find( + $collection->getId(), + [ + Query::equal('$id', $idChunk), + Query::select(['$id']), + Query::limit(\count($idChunk)), + ] + ))); + foreach ($existing as $doc) { + $preExistingIds[$doc->getId()] = true; + } + } + } + } + foreach ($documents as $document) { $createdAt = $document->getCreatedAt(); $updatedAt = $document->getUpdatedAt(); @@ -5693,15 +5760,33 @@ public function createDocuments( } if ($this->resolveRelationships) { - $document = $this->silent(fn () => $this->createDocumentRelationships($collection, $document)); + $preExistKey = $tenantPerDocument + ? $document->getTenant() . ':' . $document->getId() + : $document->getId(); + + if (!isset($preExistingIds[$preExistKey])) { + $document = $this->silent(fn () => $this->createDocumentRelationships($collection, $document)); + } } $document = $this->adapter->castingBefore($collection, $document); } foreach (\array_chunk($documents, $batchSize) as $chunk) { - $batch = $this->withTransaction(function () use ($collection, $chunk) { - return $this->adapter->createDocuments($collection, $chunk); + if ($ignore && !empty($preExistingIds)) { + $chunk = \array_values(\array_filter($chunk, function (Document $doc) use ($preExistingIds, $tenantPerDocument) { + $key = $tenantPerDocument + ? $doc->getTenant() . ':' . $doc->getId() + : $doc->getId(); + return !isset($preExistingIds[$key]); + })); + if (empty($chunk)) { + continue; + } + } + + $batch = $this->withTransaction(function () use ($collection, $chunk, $ignore) { + return $this->adapter->createDocuments($collection, $chunk, $ignore); }); $batch = $this->adapter->getSequences($collection->getId(), $batch); @@ -7116,18 +7201,53 @@ public function upsertDocumentsWithIncrease( $created = 0; $updated = 0; $seenIds = []; - foreach ($documents as $key => $document) { - if ($this->getSharedTables() && $this->getTenantPerDocument()) { - $old = $this->authorization->skip(fn () => $this->withTenant($document->getTenant(), fn () => $this->silent(fn () => $this->getDocument( - $collection->getId(), - $document->getId(), - )))); + + // Batch-fetch existing documents in one query instead of N individual getDocument() calls + $ids = \array_filter(\array_map(fn ($doc) => $doc->getId(), $documents)); + $existingDocs = []; + $upsertTenantPerDocument = $this->getSharedTables() && $this->getTenantPerDocument(); + + if (!empty($ids)) { + $uniqueIds = \array_values(\array_unique($ids)); + + if ($upsertTenantPerDocument) { + // Group IDs by tenant and fetch each group separately + // Use composite key tenant:id to avoid cross-tenant collisions + $idsByTenant = []; + foreach ($documents as $doc) { + $tenant = $doc->getTenant(); + $idsByTenant[$tenant][] = $doc->getId(); + } + foreach ($idsByTenant as $tenant => $tenantIds) { + $tenantIds = \array_values(\array_unique($tenantIds)); + foreach (\array_chunk($tenantIds, \max(1, $this->maxQueryValues)) as $idChunk) { + $fetched = $this->authorization->skip(fn () => $this->withTenant($tenant, fn () => $this->silent(fn () => $this->find( + $collection->getId(), + [Query::equal('$id', $idChunk), Query::limit(\count($idChunk))], + )))); + foreach ($fetched as $doc) { + $existingDocs[$tenant . ':' . $doc->getId()] = $doc; + } + } + } } else { - $old = $this->authorization->skip(fn () => $this->silent(fn () => $this->getDocument( - $collection->getId(), - $document->getId(), - ))); + foreach (\array_chunk($uniqueIds, \max(1, $this->maxQueryValues)) as $idChunk) { + $fetched = $this->authorization->skip(fn () => $this->silent(fn () => $this->find( + $collection->getId(), + [Query::equal('$id', $idChunk), Query::limit(\count($idChunk))], + ))); + foreach ($fetched as $doc) { + $existingDocs[$doc->getId()] = $doc; + } + } } + } + + foreach ($documents as $key => $document) { + $lookupKey = $upsertTenantPerDocument + ? $document->getTenant() . ':' . $document->getId() + : $document->getId(); + $old = $existingDocs[$lookupKey] ?? new Document(); // Extract operators early to avoid comparison issues $documentArray = $document->getArrayCopy(); diff --git a/src/Database/Mirror.php b/src/Database/Mirror.php index f740cab3e..636d273dd 100644 --- a/src/Database/Mirror.php +++ b/src/Database/Mirror.php @@ -600,6 +600,7 @@ public function createDocuments( int $batchSize = self::INSERT_BATCH_SIZE, ?callable $onNext = null, ?callable $onError = null, + bool $ignore = false, ): int { $modified = $this->source->createDocuments( $collection, @@ -607,6 +608,7 @@ public function createDocuments( $batchSize, $onNext, $onError, + $ignore, ); if ( @@ -645,6 +647,7 @@ public function createDocuments( $collection, $clones, $batchSize, + ignore: $ignore, ) ); diff --git a/tests/e2e/Adapter/Scopes/DocumentTests.php b/tests/e2e/Adapter/Scopes/DocumentTests.php index d16004d32..34a463200 100644 --- a/tests/e2e/Adapter/Scopes/DocumentTests.php +++ b/tests/e2e/Adapter/Scopes/DocumentTests.php @@ -7722,4 +7722,200 @@ public function testRegexInjection(): void // } // $database->deleteCollection($collectionName); // } + + public function testCreateDocumentsIgnoreDuplicates(): void + { + /** @var Database $database */ + $database = $this->getDatabase(); + + $database->createCollection(__FUNCTION__); + $database->createAttribute(__FUNCTION__, 'name', Database::VAR_STRING, 128, true); + + // Insert initial documents + $database->createDocuments(__FUNCTION__, [ + new Document([ + '$id' => 'doc1', + 'name' => 'Original A', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + new Document([ + '$id' => 'doc2', + 'name' => 'Original B', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + ]); + + // Without ignore, duplicates should throw + try { + $database->createDocuments(__FUNCTION__, [ + new Document([ + '$id' => 'doc1', + 'name' => 'Duplicate A', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + ]); + $this->fail('Expected DuplicateException'); + } catch (DuplicateException $e) { + $this->assertNotEmpty($e->getMessage()); + } + + // With ignore, duplicates should be silently skipped + $emittedIds = []; + $count = $database->createDocuments(__FUNCTION__, [ + new Document([ + '$id' => 'doc1', + 'name' => 'Duplicate A', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + new Document([ + '$id' => 'doc3', + 'name' => 'New C', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + ], onNext: function (Document $doc) use (&$emittedIds) { + $emittedIds[] = $doc->getId(); + }, ignore: true); + + // Only doc3 was new, doc1 was skipped as duplicate + $this->assertSame(1, $count); + $this->assertCount(1, $emittedIds); + $this->assertSame('doc3', $emittedIds[0]); + + // doc3 should exist, doc1 should retain original value + $doc1 = $database->getDocument(__FUNCTION__, 'doc1'); + $this->assertSame('Original A', $doc1->getAttribute('name')); + + $doc3 = $database->getDocument(__FUNCTION__, 'doc3'); + $this->assertSame('New C', $doc3->getAttribute('name')); + + // Total should be 3 (doc1, doc2, doc3) + $all = $database->find(__FUNCTION__); + $this->assertCount(3, $all); + } + + public function testCreateDocumentsIgnoreIntraBatchDuplicates(): void + { + /** @var Database $database */ + $database = $this->getDatabase(); + $col = 'createDocsIgnoreIntraBatch'; + + $database->createCollection($col); + $database->createAttribute($col, 'name', Database::VAR_STRING, 128, true); + + // Two docs with same ID in one batch — first wins, second is deduplicated + $emittedIds = []; + $count = $database->createDocuments($col, [ + new Document([ + '$id' => 'dup', + 'name' => 'First', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + new Document([ + '$id' => 'dup', + 'name' => 'Second', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + Permission::update(Role::user('extra')), + ], + ]), + new Document([ + '$id' => 'unique1', + 'name' => 'Unique', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + ], onNext: function (Document $doc) use (&$emittedIds) { + $emittedIds[] = $doc->getId(); + }, ignore: true); + + $this->assertSame(2, $count); + $this->assertCount(2, $emittedIds); + + // First occurrence wins + $doc = $database->getDocument($col, 'dup'); + $this->assertSame('First', $doc->getAttribute('name')); + + // Second doc's extra permission should NOT exist (no ACL drift) + $perms = $doc->getPermissions(); + foreach ($perms as $perm) { + $this->assertStringNotContainsString('extra', $perm); + } + + // unique1 should exist + $unique = $database->getDocument($col, 'unique1'); + $this->assertSame('Unique', $unique->getAttribute('name')); + + // Total: 2 documents + $all = $database->find($col); + $this->assertCount(2, $all); + } + + public function testCreateDocumentsIgnoreAllDuplicates(): void + { + /** @var Database $database */ + $database = $this->getDatabase(); + + $database->createCollection(__FUNCTION__); + $database->createAttribute(__FUNCTION__, 'name', Database::VAR_STRING, 128, true); + + // Insert initial document + $database->createDocuments(__FUNCTION__, [ + new Document([ + '$id' => 'existing', + 'name' => 'Original', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + ]); + + // With ignore, inserting only duplicates should succeed with no new rows + $emittedIds = []; + $count = $database->createDocuments(__FUNCTION__, [ + new Document([ + '$id' => 'existing', + 'name' => 'Duplicate', + '$permissions' => [ + Permission::read(Role::any()), + Permission::create(Role::any()), + ], + ]), + ], onNext: function (Document $doc) use (&$emittedIds) { + $emittedIds[] = $doc->getId(); + }, ignore: true); + + // All duplicates skipped, nothing inserted + $this->assertSame(0, $count); + $this->assertSame([], $emittedIds); + + // Original document should be unchanged + $doc = $database->getDocument(__FUNCTION__, 'existing'); + $this->assertSame('Original', $doc->getAttribute('name')); + + // Still only 1 document + $all = $database->find(__FUNCTION__); + $this->assertCount(1, $all); + } }