Skip to content

Commit 89e4cf8

Browse files
committed
skipDuplicates: drop deferred-relationships, inline pre-filter, tighten comments
The deferred-relationships mechanism was useful when the adapter could report which docs were actually inserted (via reconciliation/ upsertWithCounts). With those gone, $batch always equals $chunk, so the defer-then-match dance is dead weight. The orchestrator already pre- filters known duplicates upfront, so any survivor is expected to insert and relationships can be created inline like the non-skipDuplicates path. - Database.php: pre-filter $documents once, before encode/validate, instead of filtering each chunk inline. Drop $deferredRelationships scaffolding and the per-chunk match loop. Collapse if/elseif into one inline createDocumentRelationships call. - Mongo.php: tighten withTransaction skipDuplicates comment (the "relationship writes are deferred" rationale is no longer accurate). - Mirror.php: tighten the captureOnNext comment. - composer.lock: revert unrelated symfony-polyfill-php85 / mongo 1.0.0→ 1.0.2 bumps that crept in from an earlier composer update. Race window unchanged: a doc that passes pre-filter but loses to a concurrent insert still produces an orphan relationship and an over-counted $modified — same as before this commit.
1 parent eb99cf1 commit 89e4cf8

4 files changed

Lines changed: 28 additions & 84 deletions

File tree

composer.lock

Lines changed: 12 additions & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/Database/Adapter/Mongo.php

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -122,10 +122,7 @@ public function withTransaction(callable $callback): mixed
122122
return $callback();
123123
}
124124

125-
// skipDuplicates uses upsert + $setOnInsert, whose filter query would hit
126-
// WriteConflict (E112) under transaction snapshot isolation. The upsert is
127-
// atomic on its own and relationship writes are deferred by the Database
128-
// layer, so running outside a transaction is both safe and necessary.
125+
// upsert + $setOnInsert hits WriteConflict (E112) under txn snapshot isolation.
129126
if ($this->skipDuplicates) {
130127
return $callback();
131128
}
@@ -1500,11 +1497,7 @@ public function createDocuments(Document $collection, array $documents): array
15001497
$records[] = $record;
15011498
}
15021499

1503-
// skipDuplicates: use upsert + $setOnInsert so an already-present row (whether
1504-
// from a prior batch or a concurrent writer) becomes a silent no-op. MongoDB has
1505-
// no INSERT IGNORE, and plain insertMany aborts the transaction on any duplicate.
1506-
// Adapter::withTransaction opts out of a real transaction in this mode to avoid
1507-
// snapshot-isolation-induced WriteConflict errors from the upsert's filter query.
1500+
// insertMany aborts the txn on any duplicate; upsert + $setOnInsert no-ops instead.
15081501
if ($this->skipDuplicates) {
15091502
if (empty($records)) {
15101503
return [];
@@ -1517,8 +1510,7 @@ public function createDocuments(Document $collection, array $documents): array
15171510
$filter['_tenant'] = $record['_tenant'] ?? $this->getTenant();
15181511
}
15191512

1520-
// Filter fields cannot reappear in $setOnInsert — MongoDB rejects
1521-
// the update with a path-conflict error otherwise.
1513+
// Filter fields can't reappear in $setOnInsert (mongo path-conflict error).
15221514
$setOnInsert = $record;
15231515
unset($setOnInsert['_uid'], $setOnInsert['_tenant']);
15241516

src/Database/Database.php

Lines changed: 11 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -5748,7 +5748,8 @@ public function createDocuments(
57485748
$time = DateTime::now();
57495749
$modified = 0;
57505750

5751-
// Deduplicate intra-batch documents by ID (tenant-aware). First occurrence wins.
5751+
// skipDuplicates: dedupe intra-batch (first wins) then drop already-existing rows.
5752+
// Adapter INSERT IGNORE / ON CONFLICT / upsert is the race-safety net.
57525753
if ($this->skipDuplicates) {
57535754
$seenIds = [];
57545755
$deduplicated = [];
@@ -5763,21 +5764,14 @@ public function createDocuments(
57635764
$deduplicated[] = $document;
57645765
}
57655766
$documents = $deduplicated;
5766-
}
5767-
5768-
// Pre-fetch existing IDs so chunks only carry docs that don't already exist.
5769-
// The adapter still applies INSERT IGNORE / ON CONFLICT DO NOTHING / upsert as a
5770-
// race-safety net, but the pre-filter handles the common case so the adapter can
5771-
// simply return its input as the inserted set without per-row reconciliation.
5772-
$preExistingIds = $this->skipDuplicates
5773-
? $this->fetchExistingByIds($collection, $documents, idsOnly: true)
5774-
: [];
57755767

5776-
/** @var array<string, array<string, mixed>> $deferredRelationships */
5777-
$deferredRelationships = [];
5778-
$relationships = [];
5779-
if ($this->skipDuplicates && $this->resolveRelationships) {
5780-
$relationships = \array_filter($collection->getAttribute('attributes', []), fn ($attr) => $attr['type'] === self::VAR_RELATIONSHIP);
5768+
$preExistingIds = $this->fetchExistingByIds($collection, $documents, idsOnly: true);
5769+
if (!empty($preExistingIds)) {
5770+
$documents = \array_values(\array_filter(
5771+
$documents,
5772+
fn (Document $doc) => !isset($preExistingIds[$this->tenantKey($doc)])
5773+
));
5774+
}
57815775
}
57825776

57835777
foreach ($documents as $document) {
@@ -5819,60 +5813,20 @@ public function createDocuments(
58195813
}
58205814
}
58215815

5822-
if ($this->resolveRelationships && !empty($relationships)) {
5823-
// Defer relationship writes until after INSERT so we don't orphan records for duplicates.
5824-
$relationshipData = [];
5825-
foreach ($relationships as $rel) {
5826-
$key = $rel['key'];
5827-
$value = $document->getAttribute($key);
5828-
if ($value !== null) {
5829-
$relationshipData[$key] = $value;
5830-
}
5831-
$document->removeAttribute($key);
5832-
}
5833-
if (!empty($relationshipData)) {
5834-
$deferredRelationships[$this->tenantKey($document)] = $relationshipData;
5835-
}
5836-
} elseif ($this->resolveRelationships) {
5816+
if ($this->resolveRelationships) {
58375817
$document = $this->silent(fn () => $this->createDocumentRelationships($collection, $document));
58385818
}
58395819

58405820
$document = $this->adapter->castingBefore($collection, $document);
58415821
}
58425822

58435823
foreach (\array_chunk($documents, $batchSize) as $chunk) {
5844-
if ($this->skipDuplicates && !empty($preExistingIds)) {
5845-
$chunk = \array_values(\array_filter(
5846-
$chunk,
5847-
fn (Document $doc) => !isset($preExistingIds[$this->tenantKey($doc)])
5848-
));
5849-
if (empty($chunk)) {
5850-
continue;
5851-
}
5852-
}
5853-
5854-
// skipDuplicates wraps withTransaction so the adapter's flag is set before
5855-
// Mongo::withTransaction decides whether to start a real transaction.
5824+
// Set adapter flag before withTransaction so Mongo can opt out of a real txn.
58565825
$batch = $this->adapter->skipDuplicates(
58575826
fn () => $this->withTransaction(fn () => $this->adapter->createDocuments($collection, $chunk)),
58585827
$this->skipDuplicates
58595828
);
58605829

5861-
// Create deferred relationships only for docs that were actually inserted
5862-
if ($this->skipDuplicates && $this->resolveRelationships && \count($deferredRelationships) > 0) {
5863-
foreach ($batch as $insertedDoc) {
5864-
$deferKey = $this->tenantKey($insertedDoc);
5865-
if (\array_key_exists($deferKey, $deferredRelationships)) {
5866-
$relDoc = clone $insertedDoc;
5867-
foreach ($deferredRelationships[$deferKey] as $key => $value) {
5868-
$relDoc->setAttribute($key, $value);
5869-
}
5870-
$this->silent(fn () => $this->createDocumentRelationships($collection, $relDoc));
5871-
unset($deferredRelationships[$deferKey]);
5872-
}
5873-
}
5874-
}
5875-
58765830
$batch = $this->adapter->getSequences($collection->getId(), $batch);
58775831

58785832
if (!$this->inBatchRelationshipPopulation && $this->resolveRelationships) {

src/Database/Mirror.php

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -601,10 +601,8 @@ public function createDocuments(
601601
?callable $onNext = null,
602602
?callable $onError = null,
603603
): int {
604-
// Capture the docs the source actually persisted (rather than the input)
605-
// so that, in skipDuplicates mode, we don't end up forwarding skipped
606-
// duplicates to the destination — that would let the destination diverge
607-
// from the source whenever the source no-ops a write.
604+
// Forward only docs the source actually persisted, so skipDuplicates no-ops
605+
// on the source don't inject would-be values into the destination.
608606
/** @var array<Document> $insertedFromSource */
609607
$insertedFromSource = [];
610608
$captureOnNext = function (Document $doc) use (&$insertedFromSource, $onNext): void {

0 commit comments

Comments
 (0)