Skip to content

Commit 242c9a8

Browse files
committed
Fix race-condition reconciliation, Mongo TPD pre-filter, exception handling, upsert seenIds
- SQL: after INSERT IGNORE, verify rowCount and reconcile returned docs by comparing _createdAt timestamps to detect race-condition inserts; rebuild permissions only for actually-inserted docs - Mongo: tenant-per-document aware pre-filter with per-tenant grouping - Mongo: wrap pre-filter find/getMore in try/catch for exception mapping - Database: use tenant-aware composite key in upsert seenIds check - Fix PHPStan errors in deferred relationship block
1 parent 20846d3 commit 242c9a8

3 files changed

Lines changed: 180 additions & 62 deletions

File tree

src/Database/Adapter/Mongo.php

Lines changed: 76 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1490,44 +1490,87 @@ public function createDocuments(Document $collection, array $documents, bool $ig
14901490
// In ignore mode, pre-filter duplicates within the same session to avoid
14911491
// BulkWriteException which would abort the transaction.
14921492
if ($ignore && !empty($records)) {
1493-
$uids = \array_filter(\array_map(fn ($r) => $r['_uid'] ?? null, $records));
1494-
if (!empty($uids)) {
1495-
$uidValues = \array_values(\array_unique($uids));
1496-
$findOptions = $this->getTransactionOptions([
1497-
'projection' => ['_uid' => 1],
1498-
'batchSize' => \count($uidValues),
1499-
]);
1500-
$filters = ['_uid' => ['$in' => $uidValues]];
1501-
if ($this->sharedTables) {
1502-
$filters['_tenant'] = $this->getTenantFilters($collection->getId());
1503-
}
1504-
$response = $this->client->find($name, $filters, $findOptions);
1505-
$existingUids = [];
1506-
foreach ($response->cursor->firstBatch ?? [] as $doc) {
1507-
$existingUids[$doc->_uid] = true;
1508-
}
1509-
$cursorId = $response->cursor->id ?? null;
1510-
while ($cursorId && $cursorId !== 0) {
1511-
$more = $this->client->getMore((int)$cursorId, $name, \count($uidValues));
1512-
foreach ($more->cursor->nextBatch ?? [] as $doc) {
1513-
$existingUids[$doc->_uid] = true;
1514-
}
1515-
$cursorId = (int)($more->cursor->id ?? 0);
1516-
}
1493+
$existingKeys = [];
15171494

1518-
if (!empty($existingUids)) {
1519-
$filteredRecords = [];
1520-
$filteredDocuments = [];
1521-
foreach ($records as $i => $record) {
1495+
try {
1496+
if ($this->sharedTables && $this->tenantPerDocument) {
1497+
// Group by tenant for tenant-per-document mode
1498+
$idsByTenant = [];
1499+
foreach ($records as $record) {
15221500
$uid = $record['_uid'] ?? '';
1523-
if (!isset($existingUids[$uid])) {
1524-
$filteredRecords[] = $record;
1525-
$filteredDocuments[] = $documents[$i];
1501+
if ($uid === '') {
1502+
continue;
1503+
}
1504+
$tenant = $record['_tenant'] ?? $this->getTenant();
1505+
$idsByTenant[$tenant][] = $uid;
1506+
}
1507+
1508+
foreach ($idsByTenant as $tenant => $tenantUids) {
1509+
$tenantUids = \array_values(\array_unique($tenantUids));
1510+
$findOptions = $this->getTransactionOptions([
1511+
'projection' => ['_uid' => 1],
1512+
'batchSize' => \count($tenantUids),
1513+
]);
1514+
$filters = ['_uid' => ['$in' => $tenantUids], '_tenant' => $tenant];
1515+
$response = $this->client->find($name, $filters, $findOptions);
1516+
foreach ($response->cursor->firstBatch ?? [] as $doc) {
1517+
$existingKeys[$tenant . ':' . $doc->_uid] = true;
1518+
}
1519+
$cursorId = $response->cursor->id ?? null;
1520+
while ($cursorId && $cursorId !== 0) {
1521+
$more = $this->client->getMore((int)$cursorId, $name, \count($tenantUids));
1522+
foreach ($more->cursor->nextBatch ?? [] as $doc) {
1523+
$existingKeys[$tenant . ':' . $doc->_uid] = true;
1524+
}
1525+
$cursorId = (int)($more->cursor->id ?? 0);
15261526
}
15271527
}
1528-
$records = $filteredRecords;
1529-
$documents = $filteredDocuments;
1528+
} else {
1529+
$uids = \array_filter(\array_map(fn ($r) => $r['_uid'] ?? null, $records));
1530+
if (!empty($uids)) {
1531+
$uidValues = \array_values(\array_unique($uids));
1532+
$findOptions = $this->getTransactionOptions([
1533+
'projection' => ['_uid' => 1],
1534+
'batchSize' => \count($uidValues),
1535+
]);
1536+
$filters = ['_uid' => ['$in' => $uidValues]];
1537+
if ($this->sharedTables) {
1538+
$filters['_tenant'] = $this->getTenantFilters($collection->getId());
1539+
}
1540+
$response = $this->client->find($name, $filters, $findOptions);
1541+
foreach ($response->cursor->firstBatch ?? [] as $doc) {
1542+
$existingKeys[$doc->_uid] = true;
1543+
}
1544+
$cursorId = $response->cursor->id ?? null;
1545+
while ($cursorId && $cursorId !== 0) {
1546+
$more = $this->client->getMore((int)$cursorId, $name, \count($uidValues));
1547+
foreach ($more->cursor->nextBatch ?? [] as $doc) {
1548+
$existingKeys[$doc->_uid] = true;
1549+
}
1550+
$cursorId = (int)($more->cursor->id ?? 0);
1551+
}
1552+
}
1553+
}
1554+
} catch (MongoException $e) {
1555+
throw $this->processException($e);
1556+
}
1557+
1558+
if (!empty($existingKeys)) {
1559+
$filteredRecords = [];
1560+
$filteredDocuments = [];
1561+
$tenantPerDoc = $this->sharedTables && $this->tenantPerDocument;
1562+
foreach ($records as $i => $record) {
1563+
$uid = $record['_uid'] ?? '';
1564+
$key = $tenantPerDoc
1565+
? ($record['_tenant'] ?? $this->getTenant()) . ':' . $uid
1566+
: $uid;
1567+
if (!isset($existingKeys[$key])) {
1568+
$filteredRecords[] = $record;
1569+
$filteredDocuments[] = $documents[$i];
1570+
}
15301571
}
1572+
$records = $filteredRecords;
1573+
$documents = $filteredDocuments;
15311574
}
15321575

15331576
if (empty($records)) {

src/Database/Adapter/SQL.php

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2658,6 +2658,79 @@ public function createDocuments(Document $collection, array $documents, bool $ig
26582658

26592659
$this->execute($stmt);
26602660

2661+
// When ignore mode is on and a race condition caused some rows to be
2662+
// silently skipped, reconcile $documents and $permissions to match reality.
2663+
if ($ignore && $stmt->rowCount() < \count($documents)) {
2664+
// Build a map of expected _createdAt per UID from our documents
2665+
$expectedTimestamps = [];
2666+
foreach ($documents as $doc) {
2667+
$expectedTimestamps[$doc->getId()] = $doc->getCreatedAt();
2668+
}
2669+
2670+
// Query back to find which UIDs actually have our timestamp
2671+
$verifyPlaceholders = [];
2672+
$verifyBinds = [];
2673+
foreach (\array_values(\array_unique(\array_keys($expectedTimestamps))) as $idx => $uid) {
2674+
$ph = ':_vfy_' . $idx;
2675+
$verifyPlaceholders[] = $ph;
2676+
$verifyBinds[$ph] = $uid;
2677+
}
2678+
2679+
$tenantWhere = '';
2680+
if ($this->sharedTables) {
2681+
$tenantWhere = ' AND _tenant = :_vfy_tenant';
2682+
$verifyBinds[':_vfy_tenant'] = $this->getTenant();
2683+
}
2684+
2685+
$verifySql = 'SELECT _uid, ' . $this->quote($this->filter('_createdAt'))
2686+
. ' FROM ' . $this->getSQLTable($name)
2687+
. ' WHERE _uid IN (' . \implode(', ', $verifyPlaceholders) . ')'
2688+
. $tenantWhere;
2689+
2690+
$verifyStmt = $this->getPDO()->prepare($verifySql);
2691+
foreach ($verifyBinds as $k => $v) {
2692+
$verifyStmt->bindValue($k, $v, $this->getPDOType($v));
2693+
}
2694+
$verifyStmt->execute();
2695+
$rows = $verifyStmt->fetchAll();
2696+
$verifyStmt->closeCursor();
2697+
2698+
// Keep only docs whose _createdAt matches what we set (= ours, not racer's)
2699+
$actualTimestamps = [];
2700+
foreach ($rows as $row) {
2701+
$actualTimestamps[$row['_uid']] = $row['_createdAt'];
2702+
}
2703+
2704+
$insertedDocs = [];
2705+
$insertedUids = [];
2706+
foreach ($documents as $doc) {
2707+
$uid = $doc->getId();
2708+
if (isset($actualTimestamps[$uid]) && $actualTimestamps[$uid] === $expectedTimestamps[$uid]) {
2709+
$insertedDocs[] = $doc;
2710+
$insertedUids[$uid] = true;
2711+
}
2712+
}
2713+
$documents = $insertedDocs;
2714+
2715+
// Rebuild permissions for only the actually-inserted docs
2716+
$permissions = [];
2717+
$bindValuesPermissions = [];
2718+
foreach ($documents as $index => $document) {
2719+
foreach (Database::PERMISSIONS as $type) {
2720+
foreach ($document->getPermissionsByType($type) as $permission) {
2721+
$tenantBind = $this->sharedTables ? ", :_tenant_{$index}" : '';
2722+
$permission = \str_replace('"', '', $permission);
2723+
$permission = "('{$type}', '{$permission}', :_uid_{$index} {$tenantBind})";
2724+
$permissions[] = $permission;
2725+
$bindValuesPermissions[":_uid_{$index}"] = $document->getId();
2726+
if ($this->sharedTables) {
2727+
$bindValuesPermissions[":_tenant_{$index}"] = $document->getTenant();
2728+
}
2729+
}
2730+
}
2731+
}
2732+
}
2733+
26612734
if (!empty($permissions)) {
26622735
$tenantColumn = $this->sharedTables ? ', _tenant' : '';
26632736
$permissions = \implode(', ', $permissions);

src/Database/Database.php

Lines changed: 31 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -5726,10 +5726,12 @@ public function createDocuments(
57265726
}
57275727

57285728
// For ignore mode: defer relationship creation until after INSERT
5729+
/** @var array<string, array<string, mixed>> $deferredRelationships */
57295730
$deferredRelationships = [];
5730-
$relationships = $ignore && $this->resolveRelationships
5731-
? \array_filter($collection->getAttribute('attributes', []), fn ($attr) => $attr['type'] === self::VAR_RELATIONSHIP)
5732-
: [];
5731+
$relationships = [];
5732+
if ($ignore && $this->resolveRelationships) {
5733+
$relationships = \array_filter($collection->getAttribute('attributes', []), fn ($attr) => $attr['type'] === self::VAR_RELATIONSHIP);
5734+
}
57335735

57345736
foreach ($documents as $document) {
57355737
$createdAt = $document->getCreatedAt();
@@ -5770,31 +5772,29 @@ public function createDocuments(
57705772
}
57715773
}
57725774

5773-
if ($this->resolveRelationships) {
5774-
if ($ignore) {
5775-
// In ignore mode, defer relationship creation until after INSERT
5776-
// to avoid orphaned relationships from race-condition duplicates.
5777-
// Store original relationship data and strip attributes for INSERT.
5778-
$relationshipData = [];
5779-
foreach ($relationships as $rel) {
5780-
$key = $rel['key'];
5781-
$value = $document->getAttribute($key);
5782-
if ($value !== null) {
5783-
$relationshipData[$key] = $value;
5784-
}
5785-
$document->removeAttribute($key);
5786-
}
5787-
if (!empty($relationshipData)) {
5788-
$deferredRelationships[$document->getId()] = $relationshipData;
5775+
if ($this->resolveRelationships && !empty($relationships)) {
5776+
// In ignore mode, defer relationship creation until after INSERT
5777+
// to avoid orphaned relationships from race-condition duplicates.
5778+
// Store original relationship data and strip attributes for INSERT.
5779+
$relationshipData = [];
5780+
foreach ($relationships as $rel) {
5781+
$key = $rel['key'];
5782+
$value = $document->getAttribute($key);
5783+
if ($value !== null) {
5784+
$relationshipData[$key] = $value;
57895785
}
5790-
} else {
5791-
$preExistKey = $tenantPerDocument
5792-
? $document->getTenant() . ':' . $document->getId()
5793-
: $document->getId();
5786+
$document->removeAttribute($key);
5787+
}
5788+
if (!empty($relationshipData)) {
5789+
$deferredRelationships[$document->getId()] = $relationshipData;
5790+
}
5791+
} elseif ($this->resolveRelationships) {
5792+
$preExistKey = $tenantPerDocument
5793+
? $document->getTenant() . ':' . $document->getId()
5794+
: $document->getId();
57945795

5795-
if (!isset($preExistingIds[$preExistKey])) {
5796-
$document = $this->silent(fn () => $this->createDocumentRelationships($collection, $document));
5797-
}
5796+
if (!isset($preExistingIds[$preExistKey])) {
5797+
$document = $this->silent(fn () => $this->createDocumentRelationships($collection, $document));
57985798
}
57995799
}
58005800

@@ -5819,10 +5819,10 @@ public function createDocuments(
58195819
});
58205820

58215821
// In ignore mode, create relationships only for docs actually inserted
5822-
if ($ignore && $this->resolveRelationships && !empty($deferredRelationships)) {
5822+
if ($ignore && $this->resolveRelationships && \count($deferredRelationships) > 0) {
58235823
foreach ($batch as $insertedDoc) {
58245824
$docId = $insertedDoc->getId();
5825-
if (isset($deferredRelationships[$docId])) {
5825+
if (\array_key_exists($docId, $deferredRelationships)) {
58265826
$relDoc = clone $insertedDoc;
58275827
foreach ($deferredRelationships[$docId] as $key => $value) {
58285828
$relDoc->setAttribute($key, $value);
@@ -7458,7 +7458,9 @@ public function upsertDocumentsWithIncrease(
74587458
$document = $this->silent(fn () => $this->createDocumentRelationships($collection, $document));
74597459
}
74607460

7461-
$seenIds[] = $document->getId();
7461+
$seenIds[] = $upsertTenantPerDocument
7462+
? $document->getTenant() . ':' . $document->getId()
7463+
: $document->getId();
74627464
$old = $this->adapter->castingBefore($collection, $old);
74637465
$document = $this->adapter->castingBefore($collection, $document);
74647466

0 commit comments

Comments
 (0)