Skip to content

Commit 4c82edb

Browse files
authored
fix optimistic state removal from queries (#78)
1 parent 55919b5 commit 4c82edb

7 files changed

Lines changed: 344 additions & 12 deletions

File tree

.changeset/salty-states-fry.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@tanstack/react-db": patch
3+
"@tanstack/db": patch
4+
---
5+
6+
Fixed an issue with injecting the optimistic state removal into the reactive live query.

.changeset/strong-groups-hunt.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@tanstack/db-collections": patch
3+
---
4+
5+
Added QueryCollection

packages/db/src/collection.ts

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -288,11 +288,15 @@ export class Collection<T extends object = Record<string, unknown>> {
288288
prevDepVals,
289289
}) => {
290290
const prevDerivedState = prevDepVals?.[0] ?? new Map<string, T>()
291+
const prevOptimisticOperations = prevDepVals?.[1] ?? []
291292
const changedKeys = new Set(this.syncedKeys)
292293
optimisticOperations
293294
.flat()
294295
.filter((op) => op.isActive)
295296
.forEach((op) => changedKeys.add(op.key))
297+
prevOptimisticOperations.flat().forEach((op) => {
298+
changedKeys.add(op.key)
299+
})
296300

297301
if (changedKeys.size === 0) {
298302
return []
@@ -309,12 +313,17 @@ export class Collection<T extends object = Record<string, unknown>> {
309313
} else if (!prevDerivedState.has(key) && derivedState.has(key)) {
310314
changes.push({ type: `insert`, key, value: derivedState.get(key)! })
311315
} else if (prevDerivedState.has(key) && derivedState.has(key)) {
312-
changes.push({
313-
type: `update`,
314-
key,
315-
value: derivedState.get(key)!,
316-
previousValue: prevDerivedState.get(key),
317-
})
316+
const value = derivedState.get(key)!
317+
const previousValue = prevDerivedState.get(key)
318+
if (value !== previousValue) {
319+
// Comparing objects by reference as records are not mutated
320+
changes.push({
321+
type: `update`,
322+
key,
323+
value,
324+
previousValue,
325+
})
326+
}
318327
}
319328
}
320329

packages/db/src/transactions.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { batch } from "@tanstack/store"
12
import { createDeferred } from "./deferred"
23
import type { Deferred } from "./deferred"
34
import type {
@@ -178,8 +179,10 @@ export class Transaction {
178179
try {
179180
await this.mutationFn({ transaction: this })
180181

181-
this.setState(`completed`)
182-
this.touchCollection()
182+
batch(() => {
183+
this.setState(`completed`)
184+
this.touchCollection()
185+
})
183186

184187
this.isPersisted.resolve(this)
185188
} catch (error) {

packages/db/tests/collection-subscribe-changes.test.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,7 @@ describe(`Collection.subscribeChanges`, () => {
399399
await tx.isPersisted.promise
400400

401401
// Verify synced update was emitted
402-
expect(callback).toHaveBeenCalledTimes(1)
402+
expect(callback).toHaveBeenCalledTimes(0)
403403
// This is called 1 time when the mutationFn call returns
404404
// and the optimistic state is dropped and the synced state applied.
405405
callback.mockReset()
@@ -438,7 +438,7 @@ describe(`Collection.subscribeChanges`, () => {
438438
await updateTx?.isPersisted.promise
439439

440440
// Verify synced update was emitted
441-
expect(callback).toHaveBeenCalledTimes(1)
441+
expect(callback).toHaveBeenCalledTimes(2) // FIXME: check is we can reduce this
442442
// This is called 1 time when the mutationFn call returns
443443
// and the optimistic state is dropped and the synced state applied.
444444
callback.mockReset()
@@ -556,7 +556,7 @@ describe(`Collection.subscribeChanges`, () => {
556556
await waitForChanges()
557557

558558
// Verify synced update was emitted
559-
expect(callback).toHaveBeenCalledTimes(1)
559+
expect(callback).toHaveBeenCalledTimes(0)
560560
// This is called when the mutationFn returns and
561561
// the optimistic state is dropped and synced state is
562562
// applied.

packages/db/tests/query/query-collection.test.ts

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import mitt from "mitt"
33
import { Collection } from "../../src/collection.js"
44
import { queryBuilder } from "../../src/query/query-builder.js"
55
import { compileQuery } from "../../src/query/compiled-query.js"
6+
import { createTransaction } from "../../src/transactions.js"
67
import type { PendingMutation } from "../../src/types.js"
78

89
type Person = {
@@ -562,6 +563,139 @@ describe(`Query Collections`, () => {
562563
{ id: `1`, name: `John Doe`, age: 40, _orderByIndex: 2 },
563564
])
564565
})
566+
567+
it(`optimistic state is dropped after commit`, async () => {
568+
const emitter = mitt()
569+
570+
// Create person collection
571+
const personCollection = new Collection<Person>({
572+
id: `person-collection-test-bug`,
573+
sync: {
574+
sync: ({ begin, write, commit }) => {
575+
// @ts-expect-error Mitt typing doesn't match our usage
576+
emitter.on(`sync-person`, (changes: Array<PendingMutation>) => {
577+
begin()
578+
changes.forEach((change) => {
579+
write({
580+
key: change.key,
581+
type: change.type,
582+
value: change.changes as Person,
583+
})
584+
})
585+
commit()
586+
})
587+
},
588+
},
589+
})
590+
591+
// Create issue collection
592+
const issueCollection = new Collection<Issue>({
593+
id: `issue-collection-test-bug`,
594+
sync: {
595+
sync: ({ begin, write, commit }) => {
596+
// @ts-expect-error Mitt typing doesn't match our usage
597+
emitter.on(`sync-issue`, (changes: Array<PendingMutation>) => {
598+
begin()
599+
changes.forEach((change) => {
600+
write({
601+
key: change.key,
602+
type: change.type,
603+
value: change.changes as Issue,
604+
})
605+
})
606+
commit()
607+
})
608+
},
609+
},
610+
})
611+
612+
// Sync initial person data
613+
emitter.emit(
614+
`sync-person`,
615+
initialPersons.map((person) => ({
616+
key: person.id,
617+
type: `insert`,
618+
changes: person,
619+
}))
620+
)
621+
622+
// Sync initial issue data
623+
emitter.emit(
624+
`sync-issue`,
625+
initialIssues.map((issue) => ({
626+
key: issue.id,
627+
type: `insert`,
628+
changes: issue,
629+
}))
630+
)
631+
632+
// Create a query with a join between persons and issues
633+
const query = queryBuilder()
634+
.from({ issues: issueCollection })
635+
.join({
636+
type: `inner`,
637+
from: { persons: personCollection },
638+
on: [`@persons.id`, `=`, `@issues.userId`],
639+
})
640+
.select(`@issues.id`, `@issues.title`, `@persons.name`)
641+
.keyBy(`@id`)
642+
643+
const compiledQuery = compileQuery(query)
644+
compiledQuery.start()
645+
646+
const result = compiledQuery.results
647+
648+
await waitForChanges()
649+
650+
// Verify initial state
651+
expect(result.state.size).toBe(3)
652+
653+
// Create a transaction to perform an optimistic mutation
654+
const tx = createTransaction({
655+
mutationFn: async () => {
656+
emitter.emit(`sync-issue`, [
657+
{
658+
key: `4`,
659+
type: `insert`,
660+
changes: {
661+
id: `4`,
662+
title: `New Issue`,
663+
description: `New Issue Description`,
664+
userId: `1`,
665+
},
666+
},
667+
])
668+
return Promise.resolve()
669+
},
670+
})
671+
672+
// Perform optimistic insert of a new issue
673+
tx.mutate(() =>
674+
issueCollection.insert(
675+
{
676+
id: `temp-key`,
677+
title: `New Issue`,
678+
description: `New Issue Description`,
679+
userId: `1`,
680+
},
681+
{ key: `temp-key` }
682+
)
683+
)
684+
685+
// Verify optimistic state is immediately reflected
686+
expect(result.state.size).toBe(4)
687+
expect(result.state.get(`temp-key`)).toEqual({
688+
id: `temp-key`,
689+
name: `John Doe`,
690+
title: `New Issue`,
691+
})
692+
693+
// Wait for the transaction to be committed
694+
await tx.isPersisted.promise
695+
696+
expect(result.state.size).toBe(4)
697+
expect(result.state.get(`4`)).toBeDefined()
698+
})
565699
})
566700

567701
async function waitForChanges(ms = 0) {

0 commit comments

Comments
 (0)