Skip to content

Commit 3c85013

Browse files
committed
Mirror::createDocuments: re-fetch source after skipDuplicates insert
Addresses Greptile #3084293974. The captureOnNext-based forwarding was structurally broken post-pre-filter-removal: SQL adapters return the full input batch from createDocuments regardless of how many rows INSERT IGNORE actually inserted, so onNext fired for every document including skipped duplicates. captureOnNext buffered all of them and flushed the full buffer (carrying the caller's would-be values) to destination, diverging destination from source whenever source had a pre-existing row the caller tried to overwrite. Fix: instead of trying to distinguish inserted vs skipped at insert time, re-fetch source's authoritative state via find() after the adapter write settles, then forward that to destination. Race-free regardless of concurrent writers — destination always receives a faithful snapshot of whatever source holds, because the query runs after source's write has resolved. Cost: one extra SELECT per batch against source when skipDuplicates is active AND Mirror has an upgraded destination. Zero cost on the non-skip path. Memory remains O(batch_size). Net ~47 lines smaller than the previous capture-based approach: - captureOnNext closure removed - flushBuffer closure removed - bounded-buffer machinery removed - shouldMirror upfront-snapshot removed (inlined as eligibility check) Test: testCreateDocumentsSkipDuplicatesDoesNotDivergeDestination now asserts destination holds source's 'Original' value rather than the caller's 'WouldBe' input, per Greptile's suggested assertion.
1 parent 9baaa04 commit 3c85013

2 files changed

Lines changed: 74 additions & 72 deletions

File tree

src/Database/Mirror.php

Lines changed: 68 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -601,95 +601,91 @@ public function createDocuments(
601601
?callable $onNext = null,
602602
?callable $onError = null,
603603
): int {
604-
// skipDuplicates: forward only what the source persisted; flush-on-fill keeps memory O($batchSize).
605604
if ($this->skipDuplicates) {
606-
// Resolve destination eligibility upfront so the flush closure can short-circuit.
607-
$shouldMirror = !\in_array($collection, self::SOURCE_ONLY_COLLECTIONS)
608-
&& $this->destination !== null;
605+
$modified = $this->source->skipDuplicates(
606+
fn () => $this->source->createDocuments(
607+
$collection,
608+
$documents,
609+
$batchSize,
610+
$onNext,
611+
$onError,
612+
)
613+
);
609614

610-
if ($shouldMirror) {
611-
$upgrade = $this->silent(fn () => $this->getUpgradeStatus($collection));
612-
$shouldMirror = $upgrade !== null && $upgrade->getAttribute('status', '') === 'upgraded';
615+
if (
616+
\in_array($collection, self::SOURCE_ONLY_COLLECTIONS)
617+
|| $this->destination === null
618+
) {
619+
return $modified;
613620
}
614621

615-
/** @var array<Document> $buffer */
616-
$buffer = [];
622+
$upgrade = $this->silent(fn () => $this->getUpgradeStatus($collection));
623+
if ($upgrade === null || $upgrade->getAttribute('status', '') !== 'upgraded') {
624+
return $modified;
625+
}
617626

618-
$flushBuffer = function () use (&$buffer, $collection, $batchSize, $shouldMirror): void {
619-
if (!$shouldMirror || empty($buffer)) {
620-
$buffer = [];
621-
return;
627+
try {
628+
// Adapter-level INSERT IGNORE does not report per-row outcomes, so
629+
// forwarding the caller's input would diverge on source-skipped duplicates.
630+
// Re-fetch source's authoritative state after its write settles and
631+
// forward that — race-free regardless of concurrent writers.
632+
$ids = \array_values(\array_filter(
633+
\array_map(fn (Document $d) => $d->getId(), $documents),
634+
fn ($id) => $id !== ''
635+
));
636+
637+
if (empty($ids)) {
638+
return $modified;
622639
}
623640

624-
try {
625-
$clones = [];
626-
foreach ($buffer as $document) {
627-
$clone = clone $document;
628-
foreach ($this->writeFilters as $filter) {
629-
$clone = $filter->beforeCreateDocument(
630-
source: $this->source,
631-
destination: $this->destination,
632-
collectionId: $collection,
633-
document: $clone,
634-
);
635-
}
636-
$clones[] = $clone;
637-
}
638-
639-
$this->destination->skipDuplicates(
640-
fn () => $this->destination->withPreserveDates(
641-
fn () => $this->destination->createDocuments(
642-
$collection,
643-
$clones,
644-
$batchSize,
645-
)
646-
)
647-
);
641+
$authoritative = $this->source->silent(
642+
fn () => $this->source->find($collection, [
643+
Query::equal('$id', $ids),
644+
Query::limit(\count($ids)),
645+
])
646+
);
648647

649-
foreach ($clones as $clone) {
650-
foreach ($this->writeFilters as $filter) {
651-
$filter->afterCreateDocument(
652-
source: $this->source,
653-
destination: $this->destination,
654-
collectionId: $collection,
655-
document: $clone,
656-
);
657-
}
648+
$clones = [];
649+
foreach ($authoritative as $document) {
650+
$clone = clone $document;
651+
foreach ($this->writeFilters as $filter) {
652+
$clone = $filter->beforeCreateDocument(
653+
source: $this->source,
654+
destination: $this->destination,
655+
collectionId: $collection,
656+
document: $clone,
657+
);
658658
}
659-
} catch (\Throwable $err) {
660-
$this->logError('createDocuments', $err);
659+
$clones[] = $clone;
661660
}
662661

663-
$buffer = [];
664-
};
662+
$this->destination->skipDuplicates(
663+
fn () => $this->destination->withPreserveDates(
664+
fn () => $this->destination->createDocuments(
665+
$collection,
666+
$clones,
667+
$batchSize,
668+
)
669+
)
670+
);
665671

666-
$captureOnNext = function (Document $doc) use (&$buffer, &$flushBuffer, $batchSize, $onNext): void {
667-
if ($onNext !== null) {
668-
$onNext($doc);
669-
}
670-
$buffer[] = $doc;
671-
if (\count($buffer) >= $batchSize) {
672-
$flushBuffer();
672+
foreach ($clones as $clone) {
673+
foreach ($this->writeFilters as $filter) {
674+
$filter->afterCreateDocument(
675+
source: $this->source,
676+
destination: $this->destination,
677+
collectionId: $collection,
678+
document: $clone,
679+
);
680+
}
673681
}
674-
};
675-
676-
$modified = $this->source->skipDuplicates(
677-
fn () => $this->source->createDocuments(
678-
$collection,
679-
$documents,
680-
$batchSize,
681-
$captureOnNext,
682-
$onError,
683-
)
684-
);
685-
686-
// Flush any tail (< $batchSize) that didn't trigger a flush during the source call.
687-
$flushBuffer();
682+
} catch (\Throwable $err) {
683+
$this->logError('createDocuments', $err);
684+
}
688685

689686
return $modified;
690687
}
691688

692-
// Non-skip path: pass through directly, forward original input to destination.
693689
$modified = $this->source->createDocuments(
694690
$collection,
695691
$documents,

tests/e2e/Adapter/MirrorTest.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,12 @@ public function testCreateDocumentsSkipDuplicatesDoesNotDivergeDestination(): vo
378378
$database->getSource()->getDocument($collection, 'fresh')->getAttribute('name')
379379
);
380380

381+
// Destination must hold source's authoritative value, not the WouldBe input.
382+
$this->assertSame(
383+
'Original',
384+
$database->getDestination()->getDocument($collection, 'dup')->getAttribute('name'),
385+
'Destination must reflect source authoritative state, not caller input'
386+
);
381387
$this->assertSame(
382388
'Fresh',
383389
$database->getDestination()->getDocument($collection, 'fresh')->getAttribute('name')

0 commit comments

Comments
 (0)