Skip to content

Commit d9ae7b7

Browse files
authored
preserve optimistic mutations during truncate operations (#659)
* add failing tests * wip fixes * more tests * changeset
1 parent 7cb54be commit d9ae7b7

5 files changed

Lines changed: 780 additions & 47 deletions

File tree

.changeset/gold-readers-taste.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@tanstack/db": patch
3+
---
4+
5+
Fixed critical bug where optimistic mutations were lost when their async handlers completed during a truncate operation. The fix captures a snapshot of optimistic state when `truncate()` is called and restores it during commit, then overlays any still-active transactions to handle late-arriving mutations. This ensures client-side optimistic state is preserved through server-initiated must-refetch scenarios.

packages/db/src/collection/state.ts

Lines changed: 51 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,18 @@ import type { CollectionLifecycleManager } from "./lifecycle"
1212
import type { CollectionChangesManager } from "./changes"
1313
import type { CollectionIndexesManager } from "./indexes"
1414

15-
interface PendingSyncedTransaction<T extends object = Record<string, unknown>> {
15+
interface PendingSyncedTransaction<
16+
T extends object = Record<string, unknown>,
17+
TKey extends string | number = string | number,
18+
> {
1619
committed: boolean
1720
operations: Array<OptimisticChangeMessage<T>>
1821
truncate?: boolean
1922
deletedKeys: Set<string | number>
23+
optimisticSnapshot?: {
24+
upserts: Map<TKey, T>
25+
deletes: Set<TKey>
26+
}
2027
}
2128

2229
export class CollectionStateManager<
@@ -33,8 +40,9 @@ export class CollectionStateManager<
3340

3441
// Core state - make public for testing
3542
public transactions: SortedMap<string, Transaction<any>>
36-
public pendingSyncedTransactions: Array<PendingSyncedTransaction<TOutput>> =
37-
[]
43+
public pendingSyncedTransactions: Array<
44+
PendingSyncedTransaction<TOutput, TKey>
45+
> = []
3846
public syncedData: Map<TKey, TOutput> | SortedMap<TKey, TOutput>
3947
public syncedMetadata = new Map<TKey, unknown>()
4048

@@ -442,10 +450,10 @@ export class CollectionStateManager<
442450
},
443451
{
444452
committedSyncedTransactions: [] as Array<
445-
PendingSyncedTransaction<TOutput>
453+
PendingSyncedTransaction<TOutput, TKey>
446454
>,
447455
uncommittedSyncedTransactions: [] as Array<
448-
PendingSyncedTransaction<TOutput>
456+
PendingSyncedTransaction<TOutput, TKey>
449457
>,
450458
hasTruncateSync: false,
451459
}
@@ -455,6 +463,12 @@ export class CollectionStateManager<
455463
// Set flag to prevent redundant optimistic state recalculations
456464
this.isCommittingSyncTransactions = true
457465

466+
// Get the optimistic snapshot from the truncate transaction (captured when truncate() was called)
467+
const truncateOptimisticSnapshot = hasTruncateSync
468+
? committedSyncedTransactions.find((t) => t.truncate)
469+
?.optimisticSnapshot
470+
: null
471+
458472
// First collect all keys that will be affected by sync operations
459473
const changedKeys = new Set<TKey>()
460474
for (const transaction of committedSyncedTransactions) {
@@ -484,13 +498,19 @@ export class CollectionStateManager<
484498
// Handle truncate operations first
485499
if (transaction.truncate) {
486500
// TRUNCATE PHASE
487-
// 1) Emit a delete for every currently-synced key so downstream listeners/indexes
501+
// 1) Emit a delete for every visible key (synced + optimistic) so downstream listeners/indexes
488502
// observe a clear-before-rebuild. We intentionally skip keys already in
489503
// optimisticDeletes because their delete was previously emitted by the user.
490-
for (const key of this.syncedData.keys()) {
491-
if (this.optimisticDeletes.has(key)) continue
504+
// Use the snapshot to ensure we emit deletes for all items that existed at truncate start.
505+
const visibleKeys = new Set([
506+
...this.syncedData.keys(),
507+
...(truncateOptimisticSnapshot?.upserts.keys() || []),
508+
])
509+
for (const key of visibleKeys) {
510+
if (truncateOptimisticSnapshot?.deletes.has(key)) continue
492511
const previousValue =
493-
this.optimisticUpserts.get(key) || this.syncedData.get(key)
512+
truncateOptimisticSnapshot?.upserts.get(key) ||
513+
this.syncedData.get(key)
494514
if (previousValue !== undefined) {
495515
events.push({ type: `delete`, key, value: previousValue })
496516
}
@@ -574,41 +594,14 @@ export class CollectionStateManager<
574594
}
575595
}
576596

577-
// Build re-apply sets from ACTIVE optimistic transactions against the new synced base
578-
// We do not copy maps; we compute intent directly from transactions to avoid drift.
579-
const reapplyUpserts = new Map<TKey, TOutput>()
580-
const reapplyDeletes = new Set<TKey>()
581-
582-
for (const tx of this.transactions.values()) {
583-
if ([`completed`, `failed`].includes(tx.state)) continue
584-
for (const mutation of tx.mutations) {
585-
if (
586-
!this.isThisCollection(mutation.collection) ||
587-
!mutation.optimistic
588-
)
589-
continue
590-
const key = mutation.key as TKey
591-
switch (mutation.type) {
592-
case `insert`:
593-
reapplyUpserts.set(key, mutation.modified as TOutput)
594-
reapplyDeletes.delete(key)
595-
break
596-
case `update`: {
597-
const base = this.syncedData.get(key)
598-
const next = base
599-
? (Object.assign({}, base, mutation.changes) as TOutput)
600-
: (mutation.modified as TOutput)
601-
reapplyUpserts.set(key, next)
602-
reapplyDeletes.delete(key)
603-
break
604-
}
605-
case `delete`:
606-
reapplyUpserts.delete(key)
607-
reapplyDeletes.add(key)
608-
break
609-
}
610-
}
611-
}
597+
// Build re-apply sets from the snapshot taken at the start of this function.
598+
// This prevents losing optimistic state if transactions complete during truncate processing.
599+
const reapplyUpserts = new Map<TKey, TOutput>(
600+
truncateOptimisticSnapshot!.upserts
601+
)
602+
const reapplyDeletes = new Set<TKey>(
603+
truncateOptimisticSnapshot!.deletes
604+
)
612605

613606
// Emit inserts for re-applied upserts, skipping any keys that have an optimistic delete.
614607
// If the server also inserted/updated the same key in this batch, override that value
@@ -660,6 +653,20 @@ export class CollectionStateManager<
660653

661654
// Reset flag and recompute optimistic state for any remaining active transactions
662655
this.isCommittingSyncTransactions = false
656+
657+
// If we had a truncate, restore the preserved optimistic state from the snapshot
658+
// This includes items from transactions that may have completed during processing
659+
if (hasTruncateSync && truncateOptimisticSnapshot) {
660+
for (const [key, value] of truncateOptimisticSnapshot.upserts) {
661+
this.optimisticUpserts.set(key, value)
662+
}
663+
for (const key of truncateOptimisticSnapshot.deletes) {
664+
this.optimisticDeletes.add(key)
665+
}
666+
}
667+
668+
// Always overlay any still-active optimistic transactions so mutations that started
669+
// after the truncate snapshot are preserved.
663670
for (const transaction of this.transactions.values()) {
664671
if (![`completed`, `failed`].includes(transaction.state)) {
665672
for (const mutation of transaction.mutations) {

packages/db/src/collection/sync.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,13 @@ export class CollectionSyncManager<
181181
// - Subsequent synced ops applied on the fresh base
182182
// - Finally, optimistic mutations re-applied on top (single batch)
183183
pendingTransaction.truncate = true
184+
185+
// Capture optimistic state NOW to preserve it even if transactions complete
186+
// before this truncate transaction is committed
187+
pendingTransaction.optimisticSnapshot = {
188+
upserts: new Map(this.state.optimisticUpserts),
189+
deletes: new Set(this.state.optimisticDeletes),
190+
}
184191
},
185192
})
186193
)

packages/db/tests/collection-subscribe-changes.test.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -964,11 +964,11 @@ describe(`Collection.subscribeChanges`, () => {
964964
value: `optimistic insert`,
965965
})
966966

967-
// Verify events are a single batch: deletes for synced keys (1,2), then inserts for preserved optimistic (1,3)
968-
expect(changeEvents.length).toBe(4)
967+
// Verify events are a single batch: deletes for ALL visible keys (1,2,3), then inserts for preserved optimistic (1,3)
968+
expect(changeEvents.length).toBe(5)
969969
const deletes = changeEvents.filter((e) => e.type === `delete`)
970970
const inserts = changeEvents.filter((e) => e.type === `insert`)
971-
expect(deletes.length).toBe(2)
971+
expect(deletes.length).toBe(3)
972972
expect(inserts.length).toBe(2)
973973

974974
const deleteByKey = new Map(deletes.map((e) => [e.key, e]))
@@ -984,6 +984,11 @@ describe(`Collection.subscribeChanges`, () => {
984984
key: 2,
985985
value: { id: 2, value: `initial value 2` },
986986
})
987+
expect(deleteByKey.get(3)).toEqual({
988+
type: `delete`,
989+
key: 3,
990+
value: { id: 3, value: `optimistic insert` },
991+
})
987992

988993
// Insert events for preserved optimistic entries (1 and 3)
989994
expect(insertByKey.get(1)).toEqual({

0 commit comments

Comments
 (0)