Skip to content

Commit 3a483f2

Browse files
committed
Mirror: forward only docs the source actually inserted
Previously Mirror::createDocuments built its destination payload from the original input ($documents), not from the docs the source actually persisted. In skipDuplicates mode that lets the destination diverge from the source whenever the source no-ops a write: source has id=a, name='Original' dest missing id=a caller skipDuplicates(createDocuments([{id: a, name: 'Duplicate'}])) The source skips a (already present) but the Mirror still forwards {id: a, name: 'Duplicate'} to the destination, which inserts it — the two replicas now hold different values for the same row. The same code path also throws "All documents must have an sequence if one is set" inside Database::createDocuments when a partially-skipped batch reaches the destination, because the inserted doc has a sequence assigned by the source adapter while the skipped doc does not. Mirror catches and silently logs that exception, so the destination ends up missing both rows. Fix: wrap the user's onNext to capture the docs the source actually persisted, then forward only those to the destination. Skipped docs are now correctly never mirrored. Adds testCreateDocumentsSkipDuplicatesDoesNotDivergeDestination — seeds a row in the source only (bypassing the mirror), then calls skipDuplicates(createDocuments(...)) with a different value for that id plus a new row. Asserts source keeps its original value, destination ends up with only the new row, and the skipped value never reaches either side.
1 parent 3b783af commit 3a483f2

2 files changed

Lines changed: 109 additions & 2 deletions

File tree

src/Database/Mirror.php

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -601,12 +601,25 @@ public function createDocuments(
601601
?callable $onNext = null,
602602
?callable $onError = null,
603603
): int {
604+
// Capture the docs the source actually persisted (rather than the input)
605+
// so that, in skipDuplicates mode, we don't end up forwarding skipped
606+
// duplicates to the destination — that would let the destination diverge
607+
// from the source whenever the source no-ops a write.
608+
/** @var array<Document> $insertedFromSource */
609+
$insertedFromSource = [];
610+
$captureOnNext = function (Document $doc) use (&$insertedFromSource, $onNext): void {
611+
$insertedFromSource[] = $doc;
612+
if ($onNext !== null) {
613+
$onNext($doc);
614+
}
615+
};
616+
604617
$modified = $this->source->skipDuplicates(
605618
fn () => $this->source->createDocuments(
606619
$collection,
607620
$documents,
608621
$batchSize,
609-
$onNext,
622+
$captureOnNext,
610623
$onError,
611624
),
612625
$this->skipDuplicates
@@ -624,10 +637,15 @@ public function createDocuments(
624637
return $modified;
625638
}
626639

640+
// Nothing for the source to mirror — early-out before touching destination.
641+
if (empty($insertedFromSource)) {
642+
return $modified;
643+
}
644+
627645
try {
628646
$clones = [];
629647

630-
foreach ($documents as $document) {
648+
foreach ($insertedFromSource as $document) {
631649
$clone = clone $document;
632650

633651
foreach ($this->writeFilters as $filter) {

tests/e2e/Adapter/MirrorTest.php

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,95 @@ public function testDeleteMirroredDocument(): void
313313
$this->assertTrue($database->getDestination()->getDocument('testDeleteMirroredDocument', $document->getId())->isEmpty());
314314
}
315315

316+
/**
317+
* Regression: when skipDuplicates causes the source to no-op a write,
318+
* the Mirror must NOT forward the input document to the destination.
319+
* Otherwise the destination would diverge from the source — receiving
320+
* the would-be value for a row the source kept unchanged.
321+
*/
322+
public function testCreateDocumentsSkipDuplicatesDoesNotDivergeDestination(): void
323+
{
324+
$database = $this->getDatabase();
325+
$collection = 'mirrorSkipDup';
326+
327+
$database->createCollection($collection, attributes: [
328+
new Document([
329+
'$id' => 'name',
330+
'type' => Database::VAR_STRING,
331+
'required' => true,
332+
'size' => Database::LENGTH_KEY,
333+
]),
334+
], permissions: [
335+
Permission::create(Role::any()),
336+
Permission::read(Role::any()),
337+
], documentSecurity: false);
338+
339+
// Seed the SOURCE only (bypass the mirror) with the row we want to
340+
// skipDuplicates over later. Destination intentionally does NOT have it.
341+
$database->getSource()->createDocument($collection, new Document([
342+
'$id' => 'dup',
343+
'name' => 'Original',
344+
'$permissions' => [
345+
Permission::read(Role::any()),
346+
Permission::create(Role::any()),
347+
],
348+
]));
349+
350+
// Sanity check setup
351+
$this->assertSame(
352+
'Original',
353+
$database->getSource()->getDocument($collection, 'dup')->getAttribute('name')
354+
);
355+
$this->assertTrue(
356+
$database->getDestination()->getDocument($collection, 'dup')->isEmpty()
357+
);
358+
359+
// Now do skipDuplicates createDocuments via the Mirror with a different
360+
// value for 'dup' plus one new doc 'fresh'. The source must skip 'dup'
361+
// (already exists) and insert 'fresh'. The Mirror must forward only
362+
// 'fresh' to the destination — NOT 'dup' with the would-be value.
363+
$database->skipDuplicates(fn () => $database->createDocuments($collection, [
364+
new Document([
365+
'$id' => 'dup',
366+
'name' => 'WouldBe',
367+
'$permissions' => [
368+
Permission::read(Role::any()),
369+
Permission::create(Role::any()),
370+
],
371+
]),
372+
new Document([
373+
'$id' => 'fresh',
374+
'name' => 'Fresh',
375+
'$permissions' => [
376+
Permission::read(Role::any()),
377+
Permission::create(Role::any()),
378+
],
379+
]),
380+
]));
381+
382+
// Source: 'dup' untouched, 'fresh' inserted.
383+
$this->assertSame(
384+
'Original',
385+
$database->getSource()->getDocument($collection, 'dup')->getAttribute('name')
386+
);
387+
$this->assertSame(
388+
'Fresh',
389+
$database->getSource()->getDocument($collection, 'fresh')->getAttribute('name')
390+
);
391+
392+
// Destination: 'fresh' was forwarded, 'dup' was NOT (would-be value
393+
// is the bug — destination should not have 'dup' at all because the
394+
// source skipped it and the destination was never told to insert it).
395+
$this->assertTrue(
396+
$database->getDestination()->getDocument($collection, 'dup')->isEmpty(),
397+
'Mirror must not forward source-skipped docs to the destination'
398+
);
399+
$this->assertSame(
400+
'Fresh',
401+
$database->getDestination()->getDocument($collection, 'fresh')->getAttribute('name')
402+
);
403+
}
404+
316405
protected function deleteColumn(string $collection, string $column): bool
317406
{
318407
$sqlTable = "`" . self::$source->getDatabase() . "`.`" . self::$source->getNamespace() . "_" . $collection . "`";

0 commit comments

Comments
 (0)