Skip to content

Commit 934ec04

Browse files
committed
Mirror::createDocuments: pre-filter existing ids + unify skip/non-skip paths
Addresses Greptile #3084293974. The captureOnNext-based forwarding was structurally broken: SQL adapters return the full input batch from createDocuments regardless of what INSERT IGNORE actually inserted, so onNext fired for every doc including skipped duplicates. captureOnNext then forwarded the caller's would-be values to destination, diverging destination from source whenever source had a pre-existing row the caller tried to re-insert. Fix: pre-filter against source before the insert to identify which input ids already exist, then forward only the genuinely-new docs to destination. A source-skipped duplicate is a no-op (no user write happened), so nothing should propagate. This matches Greptile's semantics: destination.dup.isEmpty() after skipping a duplicate. While the logic was touched, the skip and non-skip paths (which shared eligibility check, upgrade check, clone loop, destination write, and after-filter loop) are unified into a single flow with $this-> skipDuplicates branches only at the points where behavior actually differs: - source call: wrapped in source->skipDuplicates when active - $toForward: filter out pre-existing ids when active - destination call: wrapped in destination->skipDuplicates when active Non-skip hot path pays zero new closure allocations; the pre-filter block is gated behind if ($this->skipDuplicates) and short-circuits to an empty $existingIds map. Net: -46 lines in createDocuments (~170 → ~95), same test coverage, Greptile's assertion now holds.
1 parent 3c85013 commit 934ec04

2 files changed

Lines changed: 52 additions & 98 deletions

File tree

src/Database/Mirror.php

Lines changed: 47 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -601,98 +601,34 @@ public function createDocuments(
601601
?callable $onNext = null,
602602
?callable $onError = null,
603603
): int {
604+
// In skipDuplicates mode, identify which input ids already exist on source.
605+
// These will be silently no-oped by the adapter's INSERT IGNORE and must
606+
// not propagate to destination — a skipped duplicate is not a user write.
607+
$existingIds = [];
604608
if ($this->skipDuplicates) {
605-
$modified = $this->source->skipDuplicates(
606-
fn () => $this->source->createDocuments(
607-
$collection,
608-
$documents,
609-
$batchSize,
610-
$onNext,
611-
$onError,
612-
)
613-
);
609+
$ids = \array_values(\array_filter(
610+
\array_map(fn (Document $d) => $d->getId(), $documents),
611+
fn ($id) => $id !== ''
612+
));
614613

615-
if (
616-
\in_array($collection, self::SOURCE_ONLY_COLLECTIONS)
617-
|| $this->destination === null
618-
) {
619-
return $modified;
620-
}
621-
622-
$upgrade = $this->silent(fn () => $this->getUpgradeStatus($collection));
623-
if ($upgrade === null || $upgrade->getAttribute('status', '') !== 'upgraded') {
624-
return $modified;
625-
}
626-
627-
try {
628-
// Adapter-level INSERT IGNORE does not report per-row outcomes, so
629-
// forwarding the caller's input would diverge on source-skipped duplicates.
630-
// Re-fetch source's authoritative state after its write settles and
631-
// forward that — race-free regardless of concurrent writers.
632-
$ids = \array_values(\array_filter(
633-
\array_map(fn (Document $d) => $d->getId(), $documents),
634-
fn ($id) => $id !== ''
635-
));
636-
637-
if (empty($ids)) {
638-
return $modified;
639-
}
640-
641-
$authoritative = $this->source->silent(
614+
if (!empty($ids)) {
615+
$existing = $this->source->silent(
642616
fn () => $this->source->find($collection, [
643617
Query::equal('$id', $ids),
644618
Query::limit(\count($ids)),
645619
])
646620
);
647-
648-
$clones = [];
649-
foreach ($authoritative as $document) {
650-
$clone = clone $document;
651-
foreach ($this->writeFilters as $filter) {
652-
$clone = $filter->beforeCreateDocument(
653-
source: $this->source,
654-
destination: $this->destination,
655-
collectionId: $collection,
656-
document: $clone,
657-
);
658-
}
659-
$clones[] = $clone;
621+
foreach ($existing as $doc) {
622+
$existingIds[$doc->getId()] = true;
660623
}
661-
662-
$this->destination->skipDuplicates(
663-
fn () => $this->destination->withPreserveDates(
664-
fn () => $this->destination->createDocuments(
665-
$collection,
666-
$clones,
667-
$batchSize,
668-
)
669-
)
670-
);
671-
672-
foreach ($clones as $clone) {
673-
foreach ($this->writeFilters as $filter) {
674-
$filter->afterCreateDocument(
675-
source: $this->source,
676-
destination: $this->destination,
677-
collectionId: $collection,
678-
document: $clone,
679-
);
680-
}
681-
}
682-
} catch (\Throwable $err) {
683-
$this->logError('createDocuments', $err);
684624
}
685-
686-
return $modified;
687625
}
688626

689-
$modified = $this->source->createDocuments(
690-
$collection,
691-
$documents,
692-
$batchSize,
693-
$onNext,
694-
$onError,
695-
);
627+
$modified = $this->skipDuplicates
628+
? $this->source->skipDuplicates(
629+
fn () => $this->source->createDocuments($collection, $documents, $batchSize, $onNext, $onError)
630+
)
631+
: $this->source->createDocuments($collection, $documents, $batchSize, $onNext, $onError);
696632

697633
if (
698634
\in_array($collection, self::SOURCE_ONLY_COLLECTIONS)
@@ -706,12 +642,23 @@ public function createDocuments(
706642
return $modified;
707643
}
708644

645+
// In skipDuplicates mode, drop pre-existing ids so their no-op writes
646+
// don't propagate. Non-skip mode forwards everything as before.
647+
$toForward = $this->skipDuplicates
648+
? \array_values(\array_filter(
649+
$documents,
650+
fn (Document $d) => $d->getId() === '' || !isset($existingIds[$d->getId()])
651+
))
652+
: $documents;
653+
654+
if (empty($toForward)) {
655+
return $modified;
656+
}
657+
709658
try {
710659
$clones = [];
711-
712-
foreach ($documents as $document) {
660+
foreach ($toForward as $document) {
713661
$clone = clone $document;
714-
715662
foreach ($this->writeFilters as $filter) {
716663
$clone = $filter->beforeCreateDocument(
717664
source: $this->source,
@@ -720,18 +667,25 @@ public function createDocuments(
720667
document: $clone,
721668
);
722669
}
723-
724670
$clones[] = $clone;
725671
}
726672

727-
$this->destination->withPreserveDates(
728-
fn () =>
729-
$this->destination->createDocuments(
730-
$collection,
731-
$clones,
732-
$batchSize,
733-
)
734-
);
673+
if ($this->skipDuplicates) {
674+
$this->destination->skipDuplicates(
675+
fn () => $this->destination->withPreserveDates(
676+
fn () => $this->destination->createDocuments($collection, $clones, $batchSize)
677+
)
678+
);
679+
} else {
680+
$this->destination->withPreserveDates(
681+
fn () =>
682+
$this->destination->createDocuments(
683+
$collection,
684+
$clones,
685+
$batchSize,
686+
)
687+
);
688+
}
735689

736690
foreach ($clones as $clone) {
737691
foreach ($this->writeFilters as $filter) {

tests/e2e/Adapter/MirrorTest.php

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -378,11 +378,11 @@ public function testCreateDocumentsSkipDuplicatesDoesNotDivergeDestination(): vo
378378
$database->getSource()->getDocument($collection, 'fresh')->getAttribute('name')
379379
);
380380

381-
// Destination must hold source's authoritative value, not the WouldBe input.
382-
$this->assertSame(
383-
'Original',
384-
$database->getDestination()->getDocument($collection, 'dup')->getAttribute('name'),
385-
'Destination must reflect source authoritative state, not caller input'
381+
// A source-skipped duplicate is a no-op and must not propagate to
382+
// destination. Only the genuinely-inserted 'fresh' row should mirror.
383+
$this->assertTrue(
384+
$database->getDestination()->getDocument($collection, 'dup')->isEmpty(),
385+
'Source-skipped doc must not be inserted into destination'
386386
);
387387
$this->assertSame(
388388
'Fresh',

0 commit comments

Comments
 (0)