Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/Database/Adapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -729,12 +729,13 @@ abstract public function createDocument(Document $collection, Document $document
*
* @param Document $collection
* @param array<Document> $documents
* @param bool $ignore If true, silently ignore duplicate documents instead of throwing
*
* @return array<Document>
*
* @throws DatabaseException
*/
abstract public function createDocuments(Document $collection, array $documents): array;
abstract public function createDocuments(Document $collection, array $documents, bool $ignore = false): array;

/**
* Update Document
Expand Down
22 changes: 19 additions & 3 deletions src/Database/Adapter/Mongo.php
Original file line number Diff line number Diff line change
Expand Up @@ -1460,11 +1460,18 @@ public function castingBefore(Document $collection, Document $document): Documen
* @throws DuplicateException
* @throws DatabaseException
*/
public function createDocuments(Document $collection, array $documents): array
public function createDocuments(Document $collection, array $documents, bool $ignore = false): array
{
$name = $this->getNamespace() . '_' . $this->filter($collection->getId());

$options = $this->getTransactionOptions();
if ($ignore) {
// Run outside transaction — MongoDB aborts transactions on any write error,
// so ordered:false + session would roll back even successfully inserted docs.
$options = ['ordered' => false];
} else {
$options = $this->getTransactionOptions();
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Don't silently escape the active transaction.

When $ignore is true this write no longer carries the current session, so a caller that is already inside a transaction can still end up committing these documents even if the surrounding transaction later rolls back. Failing fast here is safer than silently weakening atomicity.

🔒 Suggested guard
-        if ($ignore) {
+        if ($ignore && $this->inTransaction > 0) {
+            throw new DatabaseException('createDocuments(ignore: true) is not supported inside MongoDB transactions');
+        }
+
+        if ($ignore) {
             // Run outside transaction — MongoDB aborts transactions on any write error,
             // so ordered:false + session would roll back even successfully inserted docs.
             $options = ['ordered' => false];
         } else {
             $options = $this->getTransactionOptions();
         }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if ($ignore) {
// Run outside transaction — MongoDB aborts transactions on any write error,
// so ordered:false + session would roll back even successfully inserted docs.
$options = ['ordered' => false];
} else {
$options = $this->getTransactionOptions();
}
if ($ignore && $this->inTransaction > 0) {
throw new DatabaseException('createDocuments(ignore: true) is not supported inside MongoDB transactions');
}
if ($ignore) {
// Run outside transaction — MongoDB aborts transactions on any write error,
// so ordered:false + session would roll back even successfully inserted docs.
$options = ['ordered' => false];
} else {
$options = $this->getTransactionOptions();
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/Database/Adapter/Mongo.php` around lines 1467 - 1473, The current branch
sets $options to ordered:false when $ignore is true, which drops the session and
silently escapes any active transaction; instead add a guard that detects an
active transaction/session (e.g. check $this->getTransactionOptions(),
$this->clientSession or an existing helper like hasActiveTransaction()) and
throw an exception when $ignore is true while a transaction is active, so
callers fail fast rather than weakening atomicity; implement this check before
altering $options and reference $ignore, $options and getTransactionOptions()
when adding the guard.


$records = [];
$hasSequence = null;
$documents = \array_map(fn ($doc) => clone $doc, $documents);
Expand All @@ -1490,7 +1497,16 @@ public function createDocuments(Document $collection, array $documents): array
try {
$documents = $this->client->insertMany($name, $records, $options);
} catch (MongoException $e) {
throw $this->processException($e);
$processed = $this->processException($e);

if ($ignore && $processed instanceof DuplicateException) {
// Race condition: a doc was inserted between pre-filter and insertMany.
// With ordered:false outside transaction, non-duplicate inserts persist.
// Return empty — we cannot determine which docs succeeded without querying.
return [];
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't report the raced batch as empty.

With ordered: false, inserts before the duplicate can still persist. Returning [] here makes callers treat the batch as a no-op, so follow-up work for actually inserted documents can be skipped even though those writes already committed.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/Database/Adapter/Mongo.php` around lines 1499 - 1507, The catch for
MongoException in the insertMany path currently returns an empty array when
$ignore is true and processException($e) yields a DuplicateException, which
incorrectly signals “no writes” even though with ordered:false some docs may
have persisted; instead, remove the "return []" behavior and either rethrow the
original exception ($e) or propagate the processed exception so callers can
decide to query which docs succeeded. Update the catch block in the insertMany
flow (referencing processException(), DuplicateException, $ignore, and the
insertMany/ordered:false behavior) to avoid silently returning an empty result
and ensure the exception is propagated for callers to detect/handle partial
success.


throw $processed;
}

foreach ($documents as $index => $document) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Race-condition path silently drops onNext callbacks for successfully-inserted documents

When a DuplicateException is caught here (a race condition where another writer inserted a document between the pre-filter find and insertMany), the method returns []. Because MongoDB used ordered: false, some non-duplicate documents in the batch were persisted, but Database::createDocuments will receive an empty $batch and:

  1. Call $onNext for none of those documents.
  2. Not increment $modified for them.

The comment in the code acknowledges this, but callers of createDocuments(..., ignore: true) may not be aware. Consider documenting this limitation in the Database::createDocuments docblock, or performing a follow-up find to discover which documents actually landed.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Known limitation, documented in code comments. This only triggers on a race condition — the pre-fetch already filters known duplicates before they reach the adapter. The utopia-php/mongo client does not expose BulkWriteException::getWriteResult(), so recovering partial success info would require changes to the client library. For the migration use case (single writer), this race cannot occur.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Known limitation. This only triggers on a race condition — pre-fetch filters known duplicates before reaching the adapter. The utopia-php/mongo client does not expose BulkWriteException::getWriteResult(), so recovering partial success info is not possible from this layer. Needs a fix in utopia-php/mongo to expose write results from insertMany — tracked for follow-up there. For the migration use case (single writer), this race cannot occur.

Expand Down
2 changes: 1 addition & 1 deletion src/Database/Adapter/Pool.php
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ public function createDocument(Document $collection, Document $document): Docume
return $this->delegate(__FUNCTION__, \func_get_args());
}

public function createDocuments(Document $collection, array $documents): array
public function createDocuments(Document $collection, array $documents, bool $ignore = false): array
{
return $this->delegate(__FUNCTION__, \func_get_args());
}
Expand Down
29 changes: 29 additions & 0 deletions src/Database/Adapter/Postgres.php
Original file line number Diff line number Diff line change
Expand Up @@ -1365,6 +1365,35 @@ public function updateDocument(Document $collection, string $id, Document $docum
return $document;
}

protected function getInsertKeyword(bool $ignore): string
{
return 'INSERT INTO';
}

protected function getInsertSuffix(bool $ignore, string $table): string
{
if (!$ignore) {
return '';
}

$conflictTarget = $this->sharedTables ? '("_uid", "_tenant")' : '("_uid")';

return "ON CONFLICT {$conflictTarget} DO NOTHING";
}

protected function getInsertPermissionsSuffix(bool $ignore): string
{
if (!$ignore) {
return '';
}

$conflictTarget = $this->sharedTables
? '("_type", "_permission", "_document", "_tenant")'
: '("_type", "_permission", "_document")';

return "ON CONFLICT {$conflictTarget} DO NOTHING";
}
Comment on lines +1373 to +1395
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 PostgreSQL ON CONFLICT conflict targets missing COLLATE utf8_ci_ai

Both getInsertSuffix and getInsertPermissionsSuffix produce ON CONFLICT clauses whose conflict targets do not match the actual unique indexes created by the Postgres adapter. PostgreSQL uses inference to find the unique index — when a unique index was built with a non-default collation (here utf8_ci_ai, which is deterministic = false), that same collation must be specified in the conflict target, otherwise PostgreSQL will throw:

there is no unique or exclusion constraint matching the ON CONFLICT specification

Actual unique indexes vs. conflict targets:

Table Actual index Conflict target in PR Issue
main (non-shared) ("_uid" COLLATE utf8_ci_ai) ("_uid") ❌ missing COLLATE
main (shared) ("_uid" COLLATE utf8_ci_ai, "_tenant") ("_uid", "_tenant") ❌ missing COLLATE on _uid
perms (non-shared) (_document COLLATE utf8_ci_ai, _type, _permission) ("_type", "_permission", "_document") ❌ missing COLLATE on _document
perms (shared) (_tenant, _document, _type, _permission) ("_type", "_permission", "_document", "_tenant") ✅ no collation in index

The fixes:

// getInsertSuffix – main document table
protected function getInsertSuffix(bool $ignore, string $table): string
{
    if (!$ignore) {
        return '';
    }

    $conflictTarget = $this->sharedTables
        ? '("_uid" COLLATE utf8_ci_ai, "_tenant")'
        : '("_uid" COLLATE utf8_ci_ai)';

    return "ON CONFLICT {$conflictTarget} DO NOTHING";
}

// getInsertPermissionsSuffix – permissions table
protected function getInsertPermissionsSuffix(bool $ignore): string
{
    if (!$ignore) {
        return '';
    }

    $conflictTarget = $this->sharedTables
        ? '("_type", "_permission", "_document", "_tenant")'
        : '("_document" COLLATE utf8_ci_ai, "_type", "_permission")';

    return "ON CONFLICT {$conflictTarget} DO NOTHING";
}

Without this fix, createDocuments(..., ignore: true) will always fail at the PostgreSQL level for non-shared tables (and shared tables' main document insert), raising a PDOException rather than silently ignoring duplicates.

Copy link
Copy Markdown
Contributor Author

@premtsd-code premtsd-code Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing upsertDocuments at [Postgres.php:1462] uses ("_uid") without COLLATE and works in production

https://github.com/utopia-php/database/blob/csv-import-upsert-v2/src/Database/Adapter/Postgres.php#L1462


/**
* @param string $tableName
* @param string $columns
Expand Down
37 changes: 33 additions & 4 deletions src/Database/Adapter/SQL.php
Original file line number Diff line number Diff line change
Expand Up @@ -2471,7 +2471,7 @@ protected function execute(mixed $stmt): bool
* @throws DuplicateException
* @throws \Throwable
*/
public function createDocuments(Document $collection, array $documents): array
public function createDocuments(Document $collection, array $documents, bool $ignore = false): array
{
if (empty($documents)) {
return $documents;
Expand Down Expand Up @@ -2573,8 +2573,9 @@ public function createDocuments(Document $collection, array $documents): array
$batchKeys = \implode(', ', $batchKeys);

$stmt = $this->getPDO()->prepare("
INSERT INTO {$this->getSQLTable($name)} {$columns}
{$this->getInsertKeyword($ignore)} {$this->getSQLTable($name)} {$columns}
VALUES {$batchKeys}
{$this->getInsertSuffix($ignore, $name)}
");

foreach ($bindValues as $key => $value) {
Expand All @@ -2588,8 +2589,9 @@ public function createDocuments(Document $collection, array $documents): array
$permissions = \implode(', ', $permissions);

$sqlPermissions = "
INSERT INTO {$this->getSQLTable($name . '_perms')} (_type, _permission, _document {$tenantColumn})
VALUES {$permissions};
{$this->getInsertKeyword($ignore)} {$this->getSQLTable($name . '_perms')} (_type, _permission, _document {$tenantColumn})
VALUES {$permissions}
{$this->getInsertPermissionsSuffix($ignore)}
";

$stmtPermissions = $this->getPDO()->prepare($sqlPermissions);
Expand All @@ -2608,6 +2610,33 @@ public function createDocuments(Document $collection, array $documents): array
return $documents;
}

/**
* Returns the INSERT keyword, optionally with IGNORE for duplicate handling.
* Override in adapter subclasses for DB-specific syntax.
*/
protected function getInsertKeyword(bool $ignore): string
{
return $ignore ? 'INSERT IGNORE INTO' : 'INSERT INTO';
}

/**
* Returns a suffix appended after VALUES clause for duplicate handling.
* Override in adapter subclasses (e.g., Postgres uses ON CONFLICT DO NOTHING).
*/
protected function getInsertSuffix(bool $ignore, string $table): string
{
return '';
}

/**
* Returns a suffix for the permissions INSERT statement when ignoring duplicates.
* Override in adapter subclasses for DB-specific syntax.
*/
protected function getInsertPermissionsSuffix(bool $ignore): string
{
return '';
}

/**
* @param Document $collection
* @param string $attribute
Expand Down
5 changes: 5 additions & 0 deletions src/Database/Adapter/SQLite.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
*/
class SQLite extends MariaDB
{
protected function getInsertKeyword(bool $ignore): string
{
return $ignore ? 'INSERT OR IGNORE INTO' : 'INSERT INTO';
}

/**
* @inheritDoc
*/
Expand Down
146 changes: 133 additions & 13 deletions src/Database/Database.php
Original file line number Diff line number Diff line change
Expand Up @@ -5621,6 +5621,7 @@ public function createDocument(string $collection, Document $document): Document
* @param int $batchSize
* @param (callable(Document): void)|null $onNext
* @param (callable(Throwable): void)|null $onError
* @param bool $ignore If true, silently ignore duplicate documents instead of throwing
* @return int
* @throws AuthorizationException
* @throws StructureException
Expand All @@ -5633,6 +5634,7 @@ public function createDocuments(
int $batchSize = self::INSERT_BATCH_SIZE,
?callable $onNext = null,
?callable $onError = null,
bool $ignore = false,
): int {
if (!$this->adapter->getSharedTables() && $this->adapter->getTenantPerDocument()) {
throw new DatabaseException('Shared tables must be enabled if tenant per document is enabled.');
Expand All @@ -5653,6 +5655,71 @@ public function createDocuments(
$time = DateTime::now();
$modified = 0;

// Deduplicate intra-batch documents by ID when ignore mode is on.
// Keeps the first occurrence, mirrors upsertDocuments' seenIds check.
if ($ignore) {
$seenIds = [];
$deduplicated = [];
foreach ($documents as $document) {
$docId = $document->getId();
if ($docId !== '' && isset($seenIds[$docId])) {
continue;
}
if ($docId !== '') {
$seenIds[$docId] = true;
}
$deduplicated[] = $document;
}
$documents = $deduplicated;
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

// When ignore mode is on and relationships are being resolved,
// pre-fetch existing document IDs so we skip relationship writes for duplicates
$preExistingIds = [];
$tenantPerDocument = $this->adapter->getSharedTables() && $this->adapter->getTenantPerDocument();
if ($ignore) {
if ($tenantPerDocument) {
$idsByTenant = [];
foreach ($documents as $doc) {
$idsByTenant[$doc->getTenant()][] = $doc->getId();
}
foreach ($idsByTenant as $tenant => $tenantIds) {
$tenantIds = \array_values(\array_unique($tenantIds));
foreach (\array_chunk($tenantIds, \max(1, $this->maxQueryValues)) as $idChunk) {
$existing = $this->authorization->skip(fn () => $this->withTenant($tenant, fn () => $this->silent(fn () => $this->find(
Comment thread
coderabbitai[bot] marked this conversation as resolved.
$collection->getId(),
[
Query::equal('$id', $idChunk),
Query::select(['$id']),
Query::limit(\count($idChunk)),
]
))));
foreach ($existing as $doc) {
$preExistingIds[$tenant . ':' . $doc->getId()] = true;
}
}
}
} else {
$inputIds = \array_values(\array_unique(\array_filter(
\array_map(fn (Document $doc) => $doc->getId(), $documents)
)));

foreach (\array_chunk($inputIds, \max(1, $this->maxQueryValues)) as $idChunk) {
$existing = $this->authorization->skip(fn () => $this->silent(fn () => $this->find(
$collection->getId(),
[
Query::equal('$id', $idChunk),
Query::select(['$id']),
Query::limit(\count($idChunk)),
]
)));
foreach ($existing as $doc) {
$preExistingIds[$doc->getId()] = true;
}
}
}
}

foreach ($documents as $document) {
$createdAt = $document->getCreatedAt();
$updatedAt = $document->getUpdatedAt();
Expand Down Expand Up @@ -5693,15 +5760,33 @@ public function createDocuments(
}

if ($this->resolveRelationships) {
$document = $this->silent(fn () => $this->createDocumentRelationships($collection, $document));
$preExistKey = $tenantPerDocument
? $document->getTenant() . ':' . $document->getId()
: $document->getId();

if (!isset($preExistingIds[$preExistKey])) {
$document = $this->silent(fn () => $this->createDocumentRelationships($collection, $document));
}
}

$document = $this->adapter->castingBefore($collection, $document);
}

foreach (\array_chunk($documents, $batchSize) as $chunk) {
$batch = $this->withTransaction(function () use ($collection, $chunk) {
return $this->adapter->createDocuments($collection, $chunk);
if ($ignore && !empty($preExistingIds)) {
$chunk = \array_values(\array_filter($chunk, function (Document $doc) use ($preExistingIds, $tenantPerDocument) {
$key = $tenantPerDocument
? $doc->getTenant() . ':' . $doc->getId()
: $doc->getId();
return !isset($preExistingIds[$key]);
}));
if (empty($chunk)) {
continue;
}
}
Comment thread
greptile-apps[bot] marked this conversation as resolved.

$batch = $this->withTransaction(function () use ($collection, $chunk, $ignore) {
return $this->adapter->createDocuments($collection, $chunk, $ignore);
});

$batch = $this->adapter->getSequences($collection->getId(), $batch);
Expand Down Expand Up @@ -7116,18 +7201,53 @@ public function upsertDocumentsWithIncrease(
$created = 0;
$updated = 0;
$seenIds = [];
foreach ($documents as $key => $document) {
if ($this->getSharedTables() && $this->getTenantPerDocument()) {
$old = $this->authorization->skip(fn () => $this->withTenant($document->getTenant(), fn () => $this->silent(fn () => $this->getDocument(
$collection->getId(),
$document->getId(),
))));

// Batch-fetch existing documents in one query instead of N individual getDocument() calls
$ids = \array_filter(\array_map(fn ($doc) => $doc->getId(), $documents));
$existingDocs = [];
$upsertTenantPerDocument = $this->getSharedTables() && $this->getTenantPerDocument();

if (!empty($ids)) {
$uniqueIds = \array_values(\array_unique($ids));

if ($upsertTenantPerDocument) {
// Group IDs by tenant and fetch each group separately
// Use composite key tenant:id to avoid cross-tenant collisions
$idsByTenant = [];
foreach ($documents as $doc) {
$tenant = $doc->getTenant();
$idsByTenant[$tenant][] = $doc->getId();
}
foreach ($idsByTenant as $tenant => $tenantIds) {
$tenantIds = \array_values(\array_unique($tenantIds));
foreach (\array_chunk($tenantIds, \max(1, $this->maxQueryValues)) as $idChunk) {
$fetched = $this->authorization->skip(fn () => $this->withTenant($tenant, fn () => $this->silent(fn () => $this->find(
$collection->getId(),
[Query::equal('$id', $idChunk), Query::limit(\count($idChunk))],
))));
foreach ($fetched as $doc) {
$existingDocs[$tenant . ':' . $doc->getId()] = $doc;
}
}
}
} else {
$old = $this->authorization->skip(fn () => $this->silent(fn () => $this->getDocument(
$collection->getId(),
$document->getId(),
)));
foreach (\array_chunk($uniqueIds, \max(1, $this->maxQueryValues)) as $idChunk) {
$fetched = $this->authorization->skip(fn () => $this->silent(fn () => $this->find(
$collection->getId(),
[Query::equal('$id', $idChunk), Query::limit(\count($idChunk))],
)));
foreach ($fetched as $doc) {
$existingDocs[$doc->getId()] = $doc;
}
}
}
}

foreach ($documents as $key => $document) {
$lookupKey = $upsertTenantPerDocument
? $document->getTenant() . ':' . $document->getId()
: $document->getId();
$old = $existingDocs[$lookupKey] ?? new Document();

// Extract operators early to avoid comparison issues
$documentArray = $document->getArrayCopy();
Expand Down
3 changes: 3 additions & 0 deletions src/Database/Mirror.php
Original file line number Diff line number Diff line change
Expand Up @@ -600,13 +600,15 @@ public function createDocuments(
int $batchSize = self::INSERT_BATCH_SIZE,
?callable $onNext = null,
?callable $onError = null,
bool $ignore = false,
): int {
$modified = $this->source->createDocuments(
$collection,
$documents,
$batchSize,
$onNext,
$onError,
$ignore,
);

if (
Expand Down Expand Up @@ -645,6 +647,7 @@ public function createDocuments(
$collection,
$clones,
$batchSize,
ignore: $ignore,
)
);

Expand Down
Loading
Loading