Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/Database/Adapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -729,12 +729,13 @@ abstract public function createDocument(Document $collection, Document $document
*
* @param Document $collection
* @param array<Document> $documents
* @param bool $ignore If true, silently ignore duplicate documents instead of throwing
*
* @return array<Document>
*
* @throws DatabaseException
*/
abstract public function createDocuments(Document $collection, array $documents): array;
abstract public function createDocuments(Document $collection, array $documents, bool $ignore = false): array;

/**
* Update Document
Expand Down
22 changes: 19 additions & 3 deletions src/Database/Adapter/Mongo.php
Original file line number Diff line number Diff line change
Expand Up @@ -1460,11 +1460,18 @@ public function castingBefore(Document $collection, Document $document): Documen
* @throws DuplicateException
* @throws DatabaseException
*/
public function createDocuments(Document $collection, array $documents): array
public function createDocuments(Document $collection, array $documents, bool $ignore = false): array
{
$name = $this->getNamespace() . '_' . $this->filter($collection->getId());

$options = $this->getTransactionOptions();
if ($ignore) {
// Run outside transaction — MongoDB aborts transactions on any write error,
// so ordered:false + session would roll back even successfully inserted docs.
$options = ['ordered' => false];
} else {
$options = $this->getTransactionOptions();
}

$records = [];
$hasSequence = null;
$documents = \array_map(fn ($doc) => clone $doc, $documents);
Expand All @@ -1490,7 +1497,16 @@ public function createDocuments(Document $collection, array $documents): array
try {
$documents = $this->client->insertMany($name, $records, $options);
} catch (MongoException $e) {
throw $this->processException($e);
$processed = $this->processException($e);

if ($ignore && $processed instanceof DuplicateException) {
// Race condition: a doc was inserted between pre-filter and insertMany.
// With ordered:false outside transaction, non-duplicate inserts persist.
// Return empty — we cannot determine which docs succeeded without querying.
return [];
}
Comment on lines 1499 to +1507
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 onNext never fires for successfully-inserted documents in a mixed-duplicate batch

With ordered: false, MongoDB processes every document in the batch and only raises BulkWriteException after completing all writes — meaning some documents may have been genuinely persisted before the exception. When the catch block returns [], Database.php iterates that empty array and calls onNext zero times, silently swallowing results for every document actually written in that batch.

Callers that use onNext to stream processed results (cache warming, post-insert hooks, progress tracking) will miss every successfully inserted document from any batch that contained at least one duplicate — which is the common case for ignore: true bulk-loads.

A more complete fix would recover succeeded writes from the exception itself via $e->getWriteResult()->getInsertedIds() to re-fetch and return only the actually-inserted documents, rather than returning [] unconditionally.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Known limitation. This only triggers on a race condition — pre-fetch already filters known duplicates before the adapter. The utopia-php/mongo client does not expose BulkWriteException::getWriteResult(), so we cannot recover which docs succeeded in a mixed batch. For the migration use case (single writer), this race cannot occur. Documented as accepted behavior.


throw $processed;
}

foreach ($documents as $index => $document) {
Expand Down
2 changes: 1 addition & 1 deletion src/Database/Adapter/Pool.php
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ public function createDocument(Document $collection, Document $document): Docume
return $this->delegate(__FUNCTION__, \func_get_args());
}

public function createDocuments(Document $collection, array $documents): array
public function createDocuments(Document $collection, array $documents, bool $ignore = false): array
{
return $this->delegate(__FUNCTION__, \func_get_args());
}
Expand Down
29 changes: 29 additions & 0 deletions src/Database/Adapter/Postgres.php
Original file line number Diff line number Diff line change
Expand Up @@ -1365,6 +1365,35 @@ public function updateDocument(Document $collection, string $id, Document $docum
return $document;
}

protected function getInsertKeyword(bool $ignore): string
{
return 'INSERT INTO';
}

protected function getInsertSuffix(bool $ignore, string $table): string
{
if (!$ignore) {
return '';
}

$conflictTarget = $this->sharedTables ? '("_uid", "_tenant")' : '("_uid")';

return "ON CONFLICT {$conflictTarget} DO NOTHING";
}

protected function getInsertPermissionsSuffix(bool $ignore): string
{
if (!$ignore) {
return '';
}

$conflictTarget = $this->sharedTables
? '("_type", "_permission", "_document", "_tenant")'
: '("_type", "_permission", "_document")';

return "ON CONFLICT {$conflictTarget} DO NOTHING";
}
Comment on lines +1373 to +1395
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 ON CONFLICT targets omit COLLATE utf8_ci_ai, breaking index inference on Postgres

The unique indexes created in createCollection use COLLATE utf8_ci_ai on "_uid" (documents table, both shared and non-shared modes) and on _document (non-shared permissions table). PostgreSQL's ON CONFLICT (col_list) inference requires an exact column match — including collation — to identify which unique constraint to use. The generated conflict targets omit the collation, so Postgres will throw:

ERROR: there is no unique or exclusion constraint matching the ON CONFLICT specification

This means every createDocuments(..., ignore: true) call will fail on Postgres.

getInsertSuffix (lines 1373–1382): The unique index on the documents table is "_uid" COLLATE utf8_ci_ai [, "_tenant"], but the conflict target is ("_uid", "_tenant") / ("_uid") with no collation.

getInsertPermissionsSuffix (lines 1384–1395, non-shared branch): The unique index on _perms is (_document COLLATE utf8_ci_ai, _type, _permission), but the conflict target is ("_type", "_permission", "_document") with no collation.

The collation must be specified in the inference list to match the actual index. For example:

// getInsertSuffix — non-shared
$conflictTarget = '("_uid" COLLATE utf8_ci_ai)';

// getInsertSuffix — shared
$conflictTarget = '("_uid" COLLATE utf8_ci_ai, "_tenant")';

// getInsertPermissionsSuffix — non-shared
$conflictTarget = '("_document" COLLATE utf8_ci_ai, "_type", "_permission")';

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not an issue. The existing upsertDocuments method (Postgres.php:1462) uses ("_uid") without COLLATE and works in production. Postgres infers the correct unique index when there is only one matching index on the column set — which is the case here. All Postgres ignore tests pass (verified across all 13 adapter configurations).


/**
* @param string $tableName
* @param string $columns
Expand Down
37 changes: 33 additions & 4 deletions src/Database/Adapter/SQL.php
Original file line number Diff line number Diff line change
Expand Up @@ -2471,7 +2471,7 @@ protected function execute(mixed $stmt): bool
* @throws DuplicateException
* @throws \Throwable
*/
public function createDocuments(Document $collection, array $documents): array
public function createDocuments(Document $collection, array $documents, bool $ignore = false): array
{
if (empty($documents)) {
return $documents;
Expand Down Expand Up @@ -2573,8 +2573,9 @@ public function createDocuments(Document $collection, array $documents): array
$batchKeys = \implode(', ', $batchKeys);

$stmt = $this->getPDO()->prepare("
INSERT INTO {$this->getSQLTable($name)} {$columns}
{$this->getInsertKeyword($ignore)} {$this->getSQLTable($name)} {$columns}
VALUES {$batchKeys}
{$this->getInsertSuffix($ignore, $name)}
");

foreach ($bindValues as $key => $value) {
Expand All @@ -2588,8 +2589,9 @@ public function createDocuments(Document $collection, array $documents): array
$permissions = \implode(', ', $permissions);

$sqlPermissions = "
INSERT INTO {$this->getSQLTable($name . '_perms')} (_type, _permission, _document {$tenantColumn})
VALUES {$permissions};
{$this->getInsertKeyword($ignore)} {$this->getSQLTable($name . '_perms')} (_type, _permission, _document {$tenantColumn})
VALUES {$permissions}
{$this->getInsertPermissionsSuffix($ignore)}
";

$stmtPermissions = $this->getPDO()->prepare($sqlPermissions);
Expand All @@ -2608,6 +2610,33 @@ public function createDocuments(Document $collection, array $documents): array
return $documents;
}

/**
* Returns the INSERT keyword, optionally with IGNORE for duplicate handling.
* Override in adapter subclasses for DB-specific syntax.
*/
protected function getInsertKeyword(bool $ignore): string
{
return $ignore ? 'INSERT IGNORE INTO' : 'INSERT INTO';
Comment on lines +2617 to +2619
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 INSERT IGNORE INTO silences more than just duplicates

MySQL/MariaDB's INSERT IGNORE INTO suppresses all errors — not only duplicate-key violations. This includes data-truncation warnings (values silently truncated to fit the column), invalid foreign-key references being coerced to NULL, and other constraint violations. The PR intent is "silent duplicate handling", but INSERT IGNORE means a corrupt or structurally invalid row could be silently inserted with wrong data instead of raising an error.

For strict duplicate-only suppression on MariaDB/MySQL, consider catching the specific PDO error code 1062 in the exception handler (similar to how the Mongo adapter does it), or using INSERT INTO … ON DUPLICATE KEY UPDATE <col> = <col> as a no-op.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documents are validated by StructureValidator before reaching the adapter (Database.php:5731-5741), which checks types, sizes, required fields, and datetime ranges. Appwrite does not use foreign keys. The only error INSERT IGNORE can suppress at this layer is the duplicate key — which is the intended behavior. The alternative (ON DUPLICATE KEY UPDATE col=col) adds trigger side effects and changes rowCount() semantics.

}

/**
* Returns a suffix appended after VALUES clause for duplicate handling.
* Override in adapter subclasses (e.g., Postgres uses ON CONFLICT DO NOTHING).
*/
protected function getInsertSuffix(bool $ignore, string $table): string
{
return '';
}

/**
* Returns a suffix for the permissions INSERT statement when ignoring duplicates.
* Override in adapter subclasses for DB-specific syntax.
*/
protected function getInsertPermissionsSuffix(bool $ignore): string
{
return '';
}

/**
* @param Document $collection
* @param string $attribute
Expand Down
5 changes: 5 additions & 0 deletions src/Database/Adapter/SQLite.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
*/
class SQLite extends MariaDB
{
protected function getInsertKeyword(bool $ignore): string
{
return $ignore ? 'INSERT OR IGNORE INTO' : 'INSERT INTO';
}

/**
* @inheritDoc
*/
Expand Down
146 changes: 133 additions & 13 deletions src/Database/Database.php
Original file line number Diff line number Diff line change
Expand Up @@ -5621,6 +5621,7 @@ public function createDocument(string $collection, Document $document): Document
* @param int $batchSize
* @param (callable(Document): void)|null $onNext
* @param (callable(Throwable): void)|null $onError
* @param bool $ignore If true, silently ignore duplicate documents instead of throwing
* @return int
* @throws AuthorizationException
* @throws StructureException
Expand All @@ -5633,6 +5634,7 @@ public function createDocuments(
int $batchSize = self::INSERT_BATCH_SIZE,
?callable $onNext = null,
?callable $onError = null,
bool $ignore = false,
): int {
if (!$this->adapter->getSharedTables() && $this->adapter->getTenantPerDocument()) {
throw new DatabaseException('Shared tables must be enabled if tenant per document is enabled.');
Expand All @@ -5653,6 +5655,71 @@ public function createDocuments(
$time = DateTime::now();
$modified = 0;

// Deduplicate intra-batch documents by ID when ignore mode is on.
// Keeps the first occurrence, mirrors upsertDocuments' seenIds check.
if ($ignore) {
$seenIds = [];
$deduplicated = [];
foreach ($documents as $document) {
$docId = $document->getId();
if ($docId !== '' && isset($seenIds[$docId])) {
continue;
}
if ($docId !== '') {
$seenIds[$docId] = true;
}
$deduplicated[] = $document;
}
$documents = $deduplicated;
}

// When ignore mode is on and relationships are being resolved,
// pre-fetch existing document IDs so we skip relationship writes for duplicates
$preExistingIds = [];
$tenantPerDocument = $this->adapter->getSharedTables() && $this->adapter->getTenantPerDocument();
if ($ignore) {
if ($tenantPerDocument) {
$idsByTenant = [];
foreach ($documents as $doc) {
$idsByTenant[$doc->getTenant()][] = $doc->getId();
}
foreach ($idsByTenant as $tenant => $tenantIds) {
$tenantIds = \array_values(\array_unique($tenantIds));
foreach (\array_chunk($tenantIds, \max(1, $this->maxQueryValues)) as $idChunk) {
$existing = $this->authorization->skip(fn () => $this->withTenant($tenant, fn () => $this->silent(fn () => $this->find(
$collection->getId(),
[
Query::equal('$id', $idChunk),
Query::select(['$id']),
Query::limit(\count($idChunk)),
]
))));
foreach ($existing as $doc) {
$preExistingIds[$tenant . ':' . $doc->getId()] = true;
}
}
}
} else {
$inputIds = \array_values(\array_unique(\array_filter(
\array_map(fn (Document $doc) => $doc->getId(), $documents)
)));

foreach (\array_chunk($inputIds, \max(1, $this->maxQueryValues)) as $idChunk) {
$existing = $this->authorization->skip(fn () => $this->silent(fn () => $this->find(
$collection->getId(),
[
Query::equal('$id', $idChunk),
Query::select(['$id']),
Query::limit(\count($idChunk)),
]
)));
foreach ($existing as $doc) {
$preExistingIds[$doc->getId()] = true;
}
}
}
}

foreach ($documents as $document) {
$createdAt = $document->getCreatedAt();
$updatedAt = $document->getUpdatedAt();
Expand Down Expand Up @@ -5693,15 +5760,33 @@ public function createDocuments(
}

if ($this->resolveRelationships) {
$document = $this->silent(fn () => $this->createDocumentRelationships($collection, $document));
$preExistKey = $tenantPerDocument
? $document->getTenant() . ':' . $document->getId()
: $document->getId();

if (!isset($preExistingIds[$preExistKey])) {
$document = $this->silent(fn () => $this->createDocumentRelationships($collection, $document));
}
}

$document = $this->adapter->castingBefore($collection, $document);
}

foreach (\array_chunk($documents, $batchSize) as $chunk) {
$batch = $this->withTransaction(function () use ($collection, $chunk) {
return $this->adapter->createDocuments($collection, $chunk);
if ($ignore && !empty($preExistingIds)) {
$chunk = \array_values(\array_filter($chunk, function (Document $doc) use ($preExistingIds, $tenantPerDocument) {
$key = $tenantPerDocument
? $doc->getTenant() . ':' . $doc->getId()
: $doc->getId();
return !isset($preExistingIds[$key]);
}));
if (empty($chunk)) {
continue;
}
}

$batch = $this->withTransaction(function () use ($collection, $chunk, $ignore) {
return $this->adapter->createDocuments($collection, $chunk, $ignore);
});

$batch = $this->adapter->getSequences($collection->getId(), $batch);
Expand Down Expand Up @@ -7116,18 +7201,53 @@ public function upsertDocumentsWithIncrease(
$created = 0;
$updated = 0;
$seenIds = [];
foreach ($documents as $key => $document) {
if ($this->getSharedTables() && $this->getTenantPerDocument()) {
$old = $this->authorization->skip(fn () => $this->withTenant($document->getTenant(), fn () => $this->silent(fn () => $this->getDocument(
$collection->getId(),
$document->getId(),
))));

// Batch-fetch existing documents in one query instead of N individual getDocument() calls
$ids = \array_filter(\array_map(fn ($doc) => $doc->getId(), $documents));
$existingDocs = [];
$upsertTenantPerDocument = $this->getSharedTables() && $this->getTenantPerDocument();

if (!empty($ids)) {
$uniqueIds = \array_values(\array_unique($ids));

if ($upsertTenantPerDocument) {
// Group IDs by tenant and fetch each group separately
// Use composite key tenant:id to avoid cross-tenant collisions
$idsByTenant = [];
foreach ($documents as $doc) {
$tenant = $doc->getTenant();
$idsByTenant[$tenant][] = $doc->getId();
}
foreach ($idsByTenant as $tenant => $tenantIds) {
$tenantIds = \array_values(\array_unique($tenantIds));
foreach (\array_chunk($tenantIds, \max(1, $this->maxQueryValues)) as $idChunk) {
$fetched = $this->authorization->skip(fn () => $this->withTenant($tenant, fn () => $this->silent(fn () => $this->find(
$collection->getId(),
[Query::equal('$id', $idChunk), Query::limit(\count($idChunk))],
))));
foreach ($fetched as $doc) {
$existingDocs[$tenant . ':' . $doc->getId()] = $doc;
}
}
}
} else {
$old = $this->authorization->skip(fn () => $this->silent(fn () => $this->getDocument(
$collection->getId(),
$document->getId(),
)));
foreach (\array_chunk($uniqueIds, \max(1, $this->maxQueryValues)) as $idChunk) {
$fetched = $this->authorization->skip(fn () => $this->silent(fn () => $this->find(
$collection->getId(),
[Query::equal('$id', $idChunk), Query::limit(\count($idChunk))],
)));
foreach ($fetched as $doc) {
$existingDocs[$doc->getId()] = $doc;
}
}
}
}

foreach ($documents as $key => $document) {
$lookupKey = $upsertTenantPerDocument
? $document->getTenant() . ':' . $document->getId()
: $document->getId();
$old = $existingDocs[$lookupKey] ?? new Document();

// Extract operators early to avoid comparison issues
$documentArray = $document->getArrayCopy();
Expand Down
3 changes: 3 additions & 0 deletions src/Database/Mirror.php
Original file line number Diff line number Diff line change
Expand Up @@ -600,13 +600,15 @@ public function createDocuments(
int $batchSize = self::INSERT_BATCH_SIZE,
?callable $onNext = null,
?callable $onError = null,
bool $ignore = false,
): int {
$modified = $this->source->createDocuments(
$collection,
$documents,
$batchSize,
$onNext,
$onError,
$ignore,
);

if (
Expand Down Expand Up @@ -645,6 +647,7 @@ public function createDocuments(
$collection,
$clones,
$batchSize,
ignore: $ignore,
)
);

Expand Down
Loading
Loading