Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/Database/Adapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -729,12 +729,13 @@ abstract public function createDocument(Document $collection, Document $document
*
* @param Document $collection
* @param array<Document> $documents
* @param bool $ignore If true, silently ignore duplicate documents instead of throwing
*
* @return array<Document>
*
* @throws DatabaseException
*/
abstract public function createDocuments(Document $collection, array $documents): array;
abstract public function createDocuments(Document $collection, array $documents, bool $ignore = false): array;

/**
* Update Document
Expand Down
36 changes: 34 additions & 2 deletions src/Database/Adapter/Mongo.php
Original file line number Diff line number Diff line change
Expand Up @@ -1460,11 +1460,11 @@ public function castingBefore(Document $collection, Document $document): Documen
* @throws DuplicateException
* @throws DatabaseException
*/
public function createDocuments(Document $collection, array $documents): array
public function createDocuments(Document $collection, array $documents, bool $ignore = false): array
{
$name = $this->getNamespace() . '_' . $this->filter($collection->getId());

$options = $this->getTransactionOptions();

$records = [];
$hasSequence = null;
$documents = \array_map(fn ($doc) => clone $doc, $documents);
Expand All @@ -1487,6 +1487,38 @@ public function createDocuments(Document $collection, array $documents): array
$records[] = $record;
}

// In ignore mode, pre-filter duplicates within the same session to avoid
// BulkWriteException which would abort the transaction.
if ($ignore && !empty($records)) {
$uids = \array_filter(\array_map(fn ($r) => $r['_uid'] ?? null, $records));
if (!empty($uids)) {
$findOptions = $this->getTransactionOptions(['projection' => ['_uid' => 1]]);
$result = $this->client->find($name, ['_uid' => ['$in' => \array_values($uids)]], $findOptions);
$existingUids = [];
foreach ($result->cursor->firstBatch ?? [] as $doc) {
$existingUids[$doc->_uid] = true;
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

if (!empty($existingUids)) {
$filteredRecords = [];
$filteredDocuments = [];
foreach ($records as $i => $record) {
$uid = $record['_uid'] ?? '';
if (!isset($existingUids[$uid])) {
$filteredRecords[] = $record;
$filteredDocuments[] = $documents[$i];
}
}
$records = $filteredRecords;
$documents = $filteredDocuments;
}
}

if (empty($records)) {
return [];
}
}

try {
$documents = $this->client->insertMany($name, $records, $options);
} catch (MongoException $e) {
Expand Down
2 changes: 1 addition & 1 deletion src/Database/Adapter/Pool.php
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ public function createDocument(Document $collection, Document $document): Docume
return $this->delegate(__FUNCTION__, \func_get_args());
}

public function createDocuments(Document $collection, array $documents): array
public function createDocuments(Document $collection, array $documents, bool $ignore = false): array
{
return $this->delegate(__FUNCTION__, \func_get_args());
}
Expand Down
29 changes: 29 additions & 0 deletions src/Database/Adapter/Postgres.php
Original file line number Diff line number Diff line change
Expand Up @@ -1365,6 +1365,35 @@ public function updateDocument(Document $collection, string $id, Document $docum
return $document;
}

protected function getInsertKeyword(bool $ignore): string
{
return 'INSERT INTO';
}

protected function getInsertSuffix(bool $ignore, string $table): string
{
if (!$ignore) {
return '';
}

$conflictTarget = $this->sharedTables ? '("_uid", "_tenant")' : '("_uid")';

return "ON CONFLICT {$conflictTarget} DO NOTHING";
}

protected function getInsertPermissionsSuffix(bool $ignore): string
{
if (!$ignore) {
return '';
}

$conflictTarget = $this->sharedTables
? '("_type", "_permission", "_document", "_tenant")'
: '("_type", "_permission", "_document")';

return "ON CONFLICT {$conflictTarget} DO NOTHING";
}
Comment on lines +1373 to +1395
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 PostgreSQL ON CONFLICT conflict targets missing COLLATE utf8_ci_ai

Both getInsertSuffix and getInsertPermissionsSuffix produce ON CONFLICT clauses whose conflict targets do not match the actual unique indexes created by the Postgres adapter. PostgreSQL uses inference to find the unique index — when a unique index was built with a non-default collation (here utf8_ci_ai, which is deterministic = false), that same collation must be specified in the conflict target, otherwise PostgreSQL will throw:

there is no unique or exclusion constraint matching the ON CONFLICT specification

Actual unique indexes vs. conflict targets:

Table Actual index Conflict target in PR Issue
main (non-shared) ("_uid" COLLATE utf8_ci_ai) ("_uid") ❌ missing COLLATE
main (shared) ("_uid" COLLATE utf8_ci_ai, "_tenant") ("_uid", "_tenant") ❌ missing COLLATE on _uid
perms (non-shared) (_document COLLATE utf8_ci_ai, _type, _permission) ("_type", "_permission", "_document") ❌ missing COLLATE on _document
perms (shared) (_tenant, _document, _type, _permission) ("_type", "_permission", "_document", "_tenant") ✅ no collation in index

The fixes:

// getInsertSuffix – main document table
protected function getInsertSuffix(bool $ignore, string $table): string
{
    if (!$ignore) {
        return '';
    }

    $conflictTarget = $this->sharedTables
        ? '("_uid" COLLATE utf8_ci_ai, "_tenant")'
        : '("_uid" COLLATE utf8_ci_ai)';

    return "ON CONFLICT {$conflictTarget} DO NOTHING";
}

// getInsertPermissionsSuffix – permissions table
protected function getInsertPermissionsSuffix(bool $ignore): string
{
    if (!$ignore) {
        return '';
    }

    $conflictTarget = $this->sharedTables
        ? '("_type", "_permission", "_document", "_tenant")'
        : '("_document" COLLATE utf8_ci_ai, "_type", "_permission")';

    return "ON CONFLICT {$conflictTarget} DO NOTHING";
}

Without this fix, createDocuments(..., ignore: true) will always fail at the PostgreSQL level for non-shared tables (and shared tables' main document insert), raising a PDOException rather than silently ignoring duplicates.

Copy link
Copy Markdown
Contributor Author

@premtsd-code premtsd-code Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing upsertDocuments at [Postgres.php:1462] uses ("_uid") without COLLATE and works in production

https://github.com/utopia-php/database/blob/csv-import-upsert-v2/src/Database/Adapter/Postgres.php#L1462


/**
* @param string $tableName
* @param string $columns
Expand Down
37 changes: 33 additions & 4 deletions src/Database/Adapter/SQL.php
Original file line number Diff line number Diff line change
Expand Up @@ -2471,7 +2471,7 @@ protected function execute(mixed $stmt): bool
* @throws DuplicateException
* @throws \Throwable
*/
public function createDocuments(Document $collection, array $documents): array
public function createDocuments(Document $collection, array $documents, bool $ignore = false): array
{
if (empty($documents)) {
return $documents;
Expand Down Expand Up @@ -2573,8 +2573,9 @@ public function createDocuments(Document $collection, array $documents): array
$batchKeys = \implode(', ', $batchKeys);

$stmt = $this->getPDO()->prepare("
INSERT INTO {$this->getSQLTable($name)} {$columns}
{$this->getInsertKeyword($ignore)} {$this->getSQLTable($name)} {$columns}
VALUES {$batchKeys}
{$this->getInsertSuffix($ignore, $name)}
");

foreach ($bindValues as $key => $value) {
Expand All @@ -2588,8 +2589,9 @@ public function createDocuments(Document $collection, array $documents): array
$permissions = \implode(', ', $permissions);

$sqlPermissions = "
INSERT INTO {$this->getSQLTable($name . '_perms')} (_type, _permission, _document {$tenantColumn})
VALUES {$permissions};
{$this->getInsertKeyword($ignore)} {$this->getSQLTable($name . '_perms')} (_type, _permission, _document {$tenantColumn})
VALUES {$permissions}
{$this->getInsertPermissionsSuffix($ignore)}
";

$stmtPermissions = $this->getPDO()->prepare($sqlPermissions);
Expand All @@ -2608,6 +2610,33 @@ public function createDocuments(Document $collection, array $documents): array
return $documents;
}

/**
* Returns the INSERT keyword, optionally with IGNORE for duplicate handling.
* Override in adapter subclasses for DB-specific syntax.
*/
protected function getInsertKeyword(bool $ignore): string
{
return $ignore ? 'INSERT IGNORE INTO' : 'INSERT INTO';
}

/**
* Returns a suffix appended after VALUES clause for duplicate handling.
* Override in adapter subclasses (e.g., Postgres uses ON CONFLICT DO NOTHING).
*/
protected function getInsertSuffix(bool $ignore, string $table): string
{
return '';
}

/**
* Returns a suffix for the permissions INSERT statement when ignoring duplicates.
* Override in adapter subclasses for DB-specific syntax.
*/
protected function getInsertPermissionsSuffix(bool $ignore): string
{
return '';
}

/**
* @param Document $collection
* @param string $attribute
Expand Down
5 changes: 5 additions & 0 deletions src/Database/Adapter/SQLite.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
*/
class SQLite extends MariaDB
{
protected function getInsertKeyword(bool $ignore): string
{
return $ignore ? 'INSERT OR IGNORE INTO' : 'INSERT INTO';
}

/**
* @inheritDoc
*/
Expand Down
151 changes: 138 additions & 13 deletions src/Database/Database.php
Original file line number Diff line number Diff line change
Expand Up @@ -5621,6 +5621,7 @@ public function createDocument(string $collection, Document $document): Document
* @param int $batchSize
* @param (callable(Document): void)|null $onNext
* @param (callable(Throwable): void)|null $onError
* @param bool $ignore If true, silently ignore duplicate documents instead of throwing
* @return int
* @throws AuthorizationException
* @throws StructureException
Expand All @@ -5633,6 +5634,7 @@ public function createDocuments(
int $batchSize = self::INSERT_BATCH_SIZE,
?callable $onNext = null,
?callable $onError = null,
bool $ignore = false,
): int {
if (!$this->adapter->getSharedTables() && $this->adapter->getTenantPerDocument()) {
throw new DatabaseException('Shared tables must be enabled if tenant per document is enabled.');
Expand All @@ -5653,6 +5655,76 @@ public function createDocuments(
$time = DateTime::now();
$modified = 0;

$tenantPerDocument = $this->adapter->getSharedTables() && $this->adapter->getTenantPerDocument();

// Deduplicate intra-batch documents by ID when ignore mode is on.
// Keeps the first occurrence, mirrors upsertDocuments' seenIds check.
// In tenant-per-document mode, dedupe by tenant+id to allow same ID across tenants.
if ($ignore) {
$seenIds = [];
$deduplicated = [];
foreach ($documents as $document) {
$docId = $document->getId();
if ($docId !== '') {
$dedupeKey = $tenantPerDocument
? $document->getTenant() . ':' . $docId
: $docId;
if (isset($seenIds[$dedupeKey])) {
continue;
}
$seenIds[$dedupeKey] = true;
}
$deduplicated[] = $document;
}
$documents = $deduplicated;
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

// When ignore mode is on and relationships are being resolved,
// pre-fetch existing document IDs so we skip relationship writes for duplicates
$preExistingIds = [];
if ($ignore) {
if ($tenantPerDocument) {
$idsByTenant = [];
foreach ($documents as $doc) {
$idsByTenant[$doc->getTenant()][] = $doc->getId();
}
foreach ($idsByTenant as $tenant => $tenantIds) {
$tenantIds = \array_values(\array_unique(\array_filter($tenantIds)));
foreach (\array_chunk($tenantIds, \max(1, $this->maxQueryValues)) as $idChunk) {
$existing = $this->authorization->skip(fn () => $this->withTenant($tenant, fn () => $this->silent(fn () => $this->find(
Comment thread
coderabbitai[bot] marked this conversation as resolved.
$collection->getId(),
[
Query::equal('$id', $idChunk),
Query::select(['$id']),
Query::limit(\count($idChunk)),
]
))));
foreach ($existing as $doc) {
$preExistingIds[$tenant . ':' . $doc->getId()] = true;
}
}
}
} else {
$inputIds = \array_values(\array_unique(\array_filter(
\array_map(fn (Document $doc) => $doc->getId(), $documents)
)));

foreach (\array_chunk($inputIds, \max(1, $this->maxQueryValues)) as $idChunk) {
$existing = $this->authorization->skip(fn () => $this->silent(fn () => $this->find(
$collection->getId(),
[
Query::equal('$id', $idChunk),
Query::select(['$id']),
Query::limit(\count($idChunk)),
]
)));
foreach ($existing as $doc) {
$preExistingIds[$doc->getId()] = true;
}
}
}
}

foreach ($documents as $document) {
$createdAt = $document->getCreatedAt();
$updatedAt = $document->getUpdatedAt();
Expand Down Expand Up @@ -5693,15 +5765,33 @@ public function createDocuments(
}

if ($this->resolveRelationships) {
$document = $this->silent(fn () => $this->createDocumentRelationships($collection, $document));
$preExistKey = $tenantPerDocument
? $document->getTenant() . ':' . $document->getId()
: $document->getId();

if (!isset($preExistingIds[$preExistKey])) {
$document = $this->silent(fn () => $this->createDocumentRelationships($collection, $document));
}
}

$document = $this->adapter->castingBefore($collection, $document);
}

foreach (\array_chunk($documents, $batchSize) as $chunk) {
$batch = $this->withTransaction(function () use ($collection, $chunk) {
return $this->adapter->createDocuments($collection, $chunk);
if ($ignore && !empty($preExistingIds)) {
$chunk = \array_values(\array_filter($chunk, function (Document $doc) use ($preExistingIds, $tenantPerDocument) {
$key = $tenantPerDocument
? $doc->getTenant() . ':' . $doc->getId()
: $doc->getId();
return !isset($preExistingIds[$key]);
}));
if (empty($chunk)) {
continue;
}
}
Comment thread
greptile-apps[bot] marked this conversation as resolved.

$batch = $this->withTransaction(function () use ($collection, $chunk, $ignore) {
return $this->adapter->createDocuments($collection, $chunk, $ignore);
});

$batch = $this->adapter->getSequences($collection->getId(), $batch);
Expand Down Expand Up @@ -7116,18 +7206,53 @@ public function upsertDocumentsWithIncrease(
$created = 0;
$updated = 0;
$seenIds = [];
foreach ($documents as $key => $document) {
if ($this->getSharedTables() && $this->getTenantPerDocument()) {
$old = $this->authorization->skip(fn () => $this->withTenant($document->getTenant(), fn () => $this->silent(fn () => $this->getDocument(
$collection->getId(),
$document->getId(),
))));

// Batch-fetch existing documents in one query instead of N individual getDocument() calls
$ids = \array_filter(\array_map(fn ($doc) => $doc->getId(), $documents));
$existingDocs = [];
$upsertTenantPerDocument = $this->getSharedTables() && $this->getTenantPerDocument();

if (!empty($ids)) {
$uniqueIds = \array_values(\array_unique($ids));

if ($upsertTenantPerDocument) {
// Group IDs by tenant and fetch each group separately
// Use composite key tenant:id to avoid cross-tenant collisions
$idsByTenant = [];
foreach ($documents as $doc) {
$tenant = $doc->getTenant();
$idsByTenant[$tenant][] = $doc->getId();
}
foreach ($idsByTenant as $tenant => $tenantIds) {
$tenantIds = \array_values(\array_unique(\array_filter($tenantIds)));
foreach (\array_chunk($tenantIds, \max(1, $this->maxQueryValues)) as $idChunk) {
$fetched = $this->authorization->skip(fn () => $this->withTenant($tenant, fn () => $this->silent(fn () => $this->find(
$collection->getId(),
[Query::equal('$id', $idChunk), Query::limit(\count($idChunk))],
))));
foreach ($fetched as $doc) {
$existingDocs[$tenant . ':' . $doc->getId()] = $doc;
}
}
}
} else {
$old = $this->authorization->skip(fn () => $this->silent(fn () => $this->getDocument(
$collection->getId(),
$document->getId(),
)));
foreach (\array_chunk($uniqueIds, \max(1, $this->maxQueryValues)) as $idChunk) {
$fetched = $this->authorization->skip(fn () => $this->silent(fn () => $this->find(
$collection->getId(),
[Query::equal('$id', $idChunk), Query::limit(\count($idChunk))],
)));
foreach ($fetched as $doc) {
$existingDocs[$doc->getId()] = $doc;
}
}
}
}

foreach ($documents as $key => $document) {
$lookupKey = $upsertTenantPerDocument
? $document->getTenant() . ':' . $document->getId()
: $document->getId();
$old = $existingDocs[$lookupKey] ?? new Document();

// Extract operators early to avoid comparison issues
$documentArray = $document->getArrayCopy();
Expand Down
Loading
Loading