Skip to content

Commit fbe5117

Browse files
committed
Address Jake's follow-up review: tighten limits, reuse tenantKey, let destination handle duplicates
1 parent 52b189b commit fbe5117

3 files changed

Lines changed: 20 additions & 50 deletions

File tree

src/Database/Database.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7163,11 +7163,11 @@ public function upsertDocumentsWithIncrease(
71637163
$found = $this->authorization->skip(fn () => $this->withTenant($tenant, fn () => $this->silent(
71647164
fn () => $this->find($collection->getId(), [
71657165
Query::equal('$id', $chunk),
7166-
Query::limit(PHP_INT_MAX),
7166+
Query::limit($this->maxQueryValues),
71677167
])
71687168
)));
71697169
foreach ($found as $doc) {
7170-
$existingDocs[$tenant . ':' . $doc->getId()] = $doc;
7170+
$existingDocs[$this->tenantKey($doc)] = $doc;
71717171
}
71727172
}
71737173
}
@@ -7182,7 +7182,7 @@ public function upsertDocumentsWithIncrease(
71827182
$existing = $this->authorization->skip(fn () => $this->silent(
71837183
fn () => $this->find($collection->getId(), [
71847184
Query::equal('$id', $chunk),
7185-
Query::limit(PHP_INT_MAX),
7185+
Query::limit($this->maxQueryValues),
71867186
])
71877187
));
71887188
foreach ($existing as $doc) {

src/Database/Mirror.php

Lines changed: 5 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -601,31 +601,6 @@ public function createDocuments(
601601
?callable $onNext = null,
602602
?callable $onError = null,
603603
): int {
604-
// In skipDuplicates mode, identify which input ids already exist on source.
605-
// These will be silently no-oped by the adapter's INSERT IGNORE and must
606-
// not propagate to destination — a skipped duplicate is not a user write.
607-
$existingIds = [];
608-
if ($this->skipDuplicates) {
609-
$ids = \array_values(\array_filter(
610-
\array_map(fn (Document $d) => $d->getId(), $documents),
611-
fn ($id) => $id !== ''
612-
));
613-
614-
if (!empty($ids)) {
615-
foreach (\array_chunk(\array_unique($ids), \max(1, $this->maxQueryValues)) as $chunk) {
616-
$existing = $this->source->silent(
617-
fn () => $this->source->find($collection, [
618-
Query::equal('$id', $chunk),
619-
Query::limit(PHP_INT_MAX),
620-
])
621-
);
622-
foreach ($existing as $doc) {
623-
$existingIds[$doc->getId()] = true;
624-
}
625-
}
626-
}
627-
}
628-
629604
$modified = $this->skipDuplicates
630605
? $this->source->skipDuplicates(
631606
fn () => $this->source->createDocuments($collection, $documents, $batchSize, $onNext, $onError)
@@ -644,22 +619,13 @@ public function createDocuments(
644619
return $modified;
645620
}
646621

647-
// In skipDuplicates mode, drop pre-existing ids so their no-op writes
648-
// don't propagate. Non-skip mode forwards everything as before.
649-
$toForward = $this->skipDuplicates
650-
? \array_values(\array_filter(
651-
$documents,
652-
fn (Document $d) => $d->getId() === '' || !isset($existingIds[$d->getId()])
653-
))
654-
: $documents;
655-
656-
if (empty($toForward)) {
657-
return $modified;
658-
}
659-
622+
// Forward every input to destination. "upgraded" status means the schema
623+
// is mirrored, not that every row is backfilled, so a row that is a
624+
// duplicate on source may not yet exist on destination. In skipDuplicates
625+
// mode the destination runs its own INSERT IGNORE and decides per-row.
660626
try {
661627
$clones = [];
662-
foreach ($toForward as $document) {
628+
foreach ($documents as $document) {
663629
$clone = clone $document;
664630
foreach ($this->writeFilters as $filter) {
665631
$clone = $filter->beforeCreateDocument(

tests/e2e/Adapter/MirrorTest.php

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ public function testDeleteMirroredDocument(): void
313313
$this->assertTrue($database->getDestination()->getDocument('testDeleteMirroredDocument', $document->getId())->isEmpty());
314314
}
315315

316-
public function testCreateDocumentsSkipDuplicatesDoesNotDivergeDestination(): void
316+
public function testCreateDocumentsSkipDuplicatesBackfillsDestination(): void
317317
{
318318
$database = $this->getDatabase();
319319
$collection = 'mirrorSkipDup';
@@ -331,7 +331,9 @@ public function testCreateDocumentsSkipDuplicatesDoesNotDivergeDestination(): vo
331331
], documentSecurity: false);
332332

333333
// Seed the SOURCE only (bypass the mirror) with the row we want to
334-
// skipDuplicates over later. Destination intentionally does NOT have it.
334+
// skipDuplicates over later. Destination intentionally does NOT have it —
335+
// this simulates an in-flight backfill where the collection is marked
336+
// 'upgraded' (schema mirrored) but not every row has reached destination.
335337
$database->getSource()->createDocument($collection, new Document([
336338
'$id' => 'dup',
337339
'name' => 'Original',
@@ -341,7 +343,6 @@ public function testCreateDocumentsSkipDuplicatesDoesNotDivergeDestination(): vo
341343
],
342344
]));
343345

344-
// Sanity check setup
345346
$this->assertSame(
346347
'Original',
347348
$database->getSource()->getDocument($collection, 'dup')->getAttribute('name')
@@ -369,6 +370,7 @@ public function testCreateDocumentsSkipDuplicatesDoesNotDivergeDestination(): vo
369370
]),
370371
]));
371372

373+
// Source: INSERT IGNORE — 'dup' is a no-op, keeps 'Original'.
372374
$this->assertSame(
373375
'Original',
374376
$database->getSource()->getDocument($collection, 'dup')->getAttribute('name')
@@ -378,11 +380,13 @@ public function testCreateDocumentsSkipDuplicatesDoesNotDivergeDestination(): vo
378380
$database->getSource()->getDocument($collection, 'fresh')->getAttribute('name')
379381
);
380382

381-
// A source-skipped duplicate is a no-op and must not propagate to
382-
// destination. Only the genuinely-inserted 'fresh' row should mirror.
383-
$this->assertTrue(
384-
$database->getDestination()->getDocument($collection, 'dup')->isEmpty(),
385-
'Source-skipped doc must not be inserted into destination'
383+
// Destination: 'dup' is NOT a duplicate there, so destination's own
384+
// INSERT IGNORE inserts it. This prevents permanent divergence when
385+
// destination is still catching up on rows that already exist on source.
386+
$this->assertSame(
387+
'WouldBe',
388+
$database->getDestination()->getDocument($collection, 'dup')->getAttribute('name'),
389+
'Source-skipped doc must still insert on destination when absent there'
386390
);
387391
$this->assertSame(
388392
'Fresh',

0 commit comments

Comments
 (0)