diff --git a/.changeset/fix-nested-includes-propagation.md b/.changeset/fix-nested-includes-propagation.md new file mode 100644 index 000000000..1fa6e00c1 --- /dev/null +++ b/.changeset/fix-nested-includes-propagation.md @@ -0,0 +1,5 @@ +--- +'@tanstack/db': patch +--- + +Fix nested `toArray()` includes not propagating changes at depth 3+. When a query used nested includes like `toArray(runs) → toArray(texts) → concat(toArray(textDeltas))`, changes to the deepest level (e.g., inserting a textDelta) were silently lost because `flushIncludesState` only drained one level of nested buffers. Also throw a clear error when `toArray()` or `concat(toArray())` is used inside expressions like `coalesce()`, instead of silently producing incorrect results. diff --git a/packages/db/src/query/builder/functions.ts b/packages/db/src/query/builder/functions.ts index 84e203589..9ab4e735d 100644 --- a/packages/db/src/query/builder/functions.ts +++ b/packages/db/src/query/builder/functions.ts @@ -437,12 +437,14 @@ export const operators = [ export type OperatorName = (typeof operators)[number] export class ToArrayWrapper<_T = unknown> { + readonly __brand = `ToArrayWrapper` as const declare readonly _type: `toArray` declare readonly _result: _T constructor(public readonly query: QueryBuilder) {} } export class ConcatToArrayWrapper<_T = unknown> { + readonly __brand = `ConcatToArrayWrapper` as const declare readonly _type: `concatToArray` declare readonly _result: _T constructor(public readonly query: QueryBuilder) {} diff --git a/packages/db/src/query/builder/ref-proxy.ts b/packages/db/src/query/builder/ref-proxy.ts index 3e3bea2a4..d7ea8d786 100644 --- a/packages/db/src/query/builder/ref-proxy.ts +++ b/packages/db/src/query/builder/ref-proxy.ts @@ -284,8 +284,10 @@ export function createRefProxyWithSelected>( } /** - * Converts a value to an Expression - * If it's a RefProxy, creates a Ref, otherwise creates a Value + * Converts a value to an Expression. + * If it's a RefProxy, creates a PropRef. Throws if the value is a + * ToArrayWrapper or ConcatToArrayWrapper (these must be used as direct + * select fields). Otherwise wraps it as a Value. */ export function toExpression(value: T): BasicExpression export function toExpression(value: RefProxy): BasicExpression @@ -293,6 +295,20 @@ export function toExpression(value: any): BasicExpression { if (isRefProxy(value)) { return new PropRef(value.__path) } + // toArray() and concat(toArray()) must be used as direct select fields, not inside expressions + if ( + value && + typeof value === `object` && + (value.__brand === `ToArrayWrapper` || + value.__brand === `ConcatToArrayWrapper`) + ) { + const name = + value.__brand === `ToArrayWrapper` ? `toArray()` : `concat(toArray())` + throw new Error( + `${name} cannot be used inside expressions (e.g., coalesce(), eq(), not()). ` + + `Use ${name} directly as a select field value instead.`, + ) + } // If it's already an Expression (Func, Ref, Value) or Agg, return it directly if ( value && diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index 3a4e948e1..7353b2116 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -1699,6 +1699,30 @@ function flushIncludesState( ) } } + // Finally: entries with deep nested buffer changes (grandchild-or-deeper buffers + // have pending data, but neither this level nor the immediate child level changed). + // Without this pass, changes at depth 3+ are stranded because drainNestedBuffers + // only drains one level and Phase 4 only flushes entries dirty from Phase 2/3. + const deepBufferDirty = new Set() + if (state.nestedSetups) { + for (const [correlationKey, entry] of state.childRegistry) { + if (entriesWithChildChanges.has(correlationKey)) continue + if (dirtyFromBuffers.has(correlationKey)) continue + if ( + entry.includesStates && + hasPendingIncludesChanges(entry.includesStates) + ) { + flushIncludesState( + entry.includesStates, + entry.collection, + entry.collection.id, + null, + entry.syncMethods, + ) + deepBufferDirty.add(correlationKey) + } + } + } // For inline materializations: re-emit affected parents with updated snapshots. // We mutate items in-place (so collection.get() reflects changes immediately) @@ -1707,7 +1731,11 @@ function flushIncludesState( // deepEquals, but in-place mutation means both sides reference the same // object, so the comparison always returns true and suppresses the event. const inlineReEmitKeys = materializesInline(state) - ? new Set([...(affectedCorrelationKeys || []), ...dirtyFromBuffers]) + ? new Set([ + ...(affectedCorrelationKeys || []), + ...dirtyFromBuffers, + ...deepBufferDirty, + ]) : null if (parentSyncMethods && inlineReEmitKeys && inlineReEmitKeys.size > 0) { const events: Array> = [] diff --git a/packages/db/tests/query/includes.test.ts b/packages/db/tests/query/includes.test.ts index 9d923bdfa..a0b490bdf 100644 --- a/packages/db/tests/query/includes.test.ts +++ b/packages/db/tests/query/includes.test.ts @@ -1,6 +1,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' import { and, + coalesce, concat, count, createLiveQueryCollection, @@ -9,7 +10,9 @@ import { } from '../../src/query/index.js' import { createCollection } from '../../src/collection/index.js' import { CleanupQueue } from '../../src/collection/cleanup-queue.js' +import { localOnlyCollectionOptions } from '../../src/local-only.js' import { mockSyncCollectionOptions, stripVirtualProps } from '../utils.js' +import type { SyncConfig } from '../../src/types.js' type Project = { id: number @@ -4061,4 +4064,1064 @@ describe(`includes subqueries`, () => { ]) }) }) + + describe(`toArray/concat(toArray) inside expressions throws`, () => { + it(`throws a clear error when concat(toArray()) is wrapped in coalesce()`, () => { + type Message = { id: number; role: string } + type Chunk = { + id: number + messageId: number + text: string + timestamp: number + } + + const messages = createCollection( + mockSyncCollectionOptions({ + id: `bug1-messages`, + getKey: (m) => m.id, + initialData: [{ id: 1, role: `assistant` }], + }), + ) + + const chunks = createCollection( + mockSyncCollectionOptions({ + id: `bug1-chunks`, + getKey: (c) => c.id, + initialData: [{ id: 10, messageId: 1, text: `Hello`, timestamp: 1 }], + }), + ) + + expect(() => + createLiveQueryCollection((q) => + q.from({ m: messages }).select(({ m }) => ({ + id: m.id, + content: coalesce( + concat( + toArray( + q + .from({ c: chunks }) + .where(({ c }) => eq(c.messageId, m.id)) + .orderBy(({ c }) => c.timestamp) + .select(({ c }) => c.text), + ), + ) as any, + ``, + ), + })), + ), + ).toThrow(`concat(toArray()) cannot be used inside expressions`) + }) + + it(`toArray() wrapped in coalesce() also throws`, () => { + type Parent = { id: number } + type Child = { id: number; parentId: number } + + const parents = createCollection( + mockSyncCollectionOptions({ + id: `bug1b-parents`, + getKey: (p) => p.id, + initialData: [{ id: 1 }], + }), + ) + + const children = createCollection( + mockSyncCollectionOptions({ + id: `bug1b-children`, + getKey: (c) => c.id, + initialData: [], + }), + ) + + expect(() => + createLiveQueryCollection((q) => + q.from({ p: parents }).select(({ p }) => ({ + id: p.id, + items: coalesce( + toArray( + q + .from({ c: children }) + .where(({ c }) => eq(c.parentId, p.id)) + .select(({ c }) => ({ id: c.id })), + ) as any, + [], + ), + })), + ), + ).toThrow(`toArray() cannot be used inside expressions`) + }) + }) + + describe(`sequential inserts into toArray child`, () => { + it(`second insert propagates (mockSync)`, async () => { + type Parent = { id: number; name: string } + type Child = { id: number; parentId: number; title: string } + + const parents = createCollection( + mockSyncCollectionOptions({ + id: `bug2a-parents`, + getKey: (p) => p.id, + initialData: [{ id: 1, name: `Alpha` }], + }), + ) + + const children = createCollection( + mockSyncCollectionOptions({ + id: `bug2a-children`, + getKey: (c) => c.id, + initialData: [], + }), + ) + + const collection = createLiveQueryCollection((q) => + q.from({ p: parents }).select(({ p }) => ({ + id: p.id, + items: toArray( + q + .from({ c: children }) + .where(({ c }) => eq(c.parentId, p.id)) + .select(({ c }) => ({ + id: c.id, + title: c.title, + })), + ), + })), + ) + + await collection.preload() + expect((collection.get(1) as any).items).toEqual([]) + + // First insert + children.utils.begin() + children.utils.write({ + type: `insert`, + value: { id: 10, parentId: 1, title: `First` }, + }) + children.utils.commit() + expect((collection.get(1) as any).items).toHaveLength(1) + + // Second insert + children.utils.begin() + children.utils.write({ + type: `insert`, + value: { id: 11, parentId: 1, title: `Second` }, + }) + children.utils.commit() + expect((collection.get(1) as any).items).toHaveLength(2) + }) + + it(`second insert propagates (localOnly + collection.insert)`, async () => { + type Parent = { id: number; name: string } + type Child = { id: number; parentId: number; title: string } + + const parents = createCollection( + localOnlyCollectionOptions({ + id: `bug2b-parents`, + getKey: (p) => p.id, + initialData: [{ id: 1, name: `Alpha` }], + }), + ) + + const children = createCollection( + localOnlyCollectionOptions({ + id: `bug2b-children`, + getKey: (c) => c.id, + initialData: [], + }), + ) + + const collection = createLiveQueryCollection((q) => + q.from({ p: parents }).select(({ p }) => ({ + id: p.id, + items: toArray( + q + .from({ c: children }) + .where(({ c }) => eq(c.parentId, p.id)) + .select(({ c }) => ({ + id: c.id, + title: c.title, + })), + ), + })), + ) + + await collection.preload() + expect((collection.get(1) as any).items).toEqual([]) + + // First insert via collection.insert() + children.insert({ id: 10, parentId: 1, title: `First` }) + await new Promise((r) => setTimeout(r, 50)) + expect((collection.get(1) as any).items).toHaveLength(1) + + // Second insert via collection.insert() + children.insert({ id: 11, parentId: 1, title: `Second` }) + await new Promise((r) => setTimeout(r, 50)) + expect((collection.get(1) as any).items).toHaveLength(2) + }) + + it(`second insert propagates via concat(toArray)`, async () => { + type Message = { id: number; role: string } + type Chunk = { + id: number + messageId: number + text: string + timestamp: number + } + + const messages = createCollection( + localOnlyCollectionOptions({ + id: `bug2c-messages`, + getKey: (m) => m.id, + initialData: [{ id: 1, role: `assistant` }], + }), + ) + + const chunks = createCollection( + localOnlyCollectionOptions({ + id: `bug2c-chunks`, + getKey: (c) => c.id, + initialData: [], + }), + ) + + const collection = createLiveQueryCollection((q) => + q.from({ m: messages }).select(({ m }) => ({ + id: m.id, + content: concat( + toArray( + q + .from({ c: chunks }) + .where(({ c }) => eq(c.messageId, m.id)) + .orderBy(({ c }) => c.timestamp) + .select(({ c }) => c.text), + ), + ), + })), + ) + + await collection.preload() + expect((collection.get(1) as any).content).toBe(``) + + // First insert + chunks.insert({ + id: 10, + messageId: 1, + text: `Hello`, + timestamp: 1, + }) + await new Promise((r) => setTimeout(r, 50)) + expect((collection.get(1) as any).content).toBe(`Hello`) + + // Second insert + chunks.insert({ + id: 11, + messageId: 1, + text: ` world`, + timestamp: 2, + }) + await new Promise((r) => setTimeout(r, 50)) + expect((collection.get(1) as any).content).toBe(`Hello world`) + }) + + it(`second insert propagates through chained live query collections (darix pattern)`, async () => { + type RawDelta = { + key: string + text_id: string + delta: string + _seq: number + } + type Seed = { key: string } + + const TIMELINE_KEY = `timeline-1` + + const rawDeltas = createCollection( + localOnlyCollectionOptions({ + id: `chained-raw-deltas`, + getKey: (d) => d.key, + initialData: [], + }), + ) + + const derivedDeltas = createLiveQueryCollection({ + id: `chained-derived-deltas`, + query: (q: any) => + q.from({ d: rawDeltas }).select(({ d }: any) => ({ + key: d.key, + text_id: d.text_id, + timelineKey: TIMELINE_KEY, + order: d._seq, + delta: d.delta, + })), + }) + + const seeds = createCollection( + localOnlyCollectionOptions({ + id: `chained-seeds`, + getKey: (s) => s.key, + initialData: [{ key: TIMELINE_KEY }], + }), + ) + + const collection = createLiveQueryCollection({ + query: (q: any) => + q.from({ s: seeds }).select(({ s }: any) => ({ + key: s.key, + deltas: toArray( + q + .from({ d: derivedDeltas }) + .where(({ d }: any) => eq(d.timelineKey, s.key)) + .orderBy(({ d }: any) => d.order) + .select(({ d }: any) => ({ + key: d.key, + delta: d.delta, + })), + ), + })), + }) + + await collection.preload() + const data = () => collection.get(TIMELINE_KEY) + + expect(data().deltas).toEqual([]) + + rawDeltas.insert({ key: `td-1`, text_id: `t-1`, delta: `Hello`, _seq: 1 }) + await new Promise((r) => setTimeout(r, 100)) + expect(data().deltas).toHaveLength(1) + expect(data().deltas[0].delta).toBe(`Hello`) + + rawDeltas.insert({ + key: `td-2`, + text_id: `t-1`, + delta: ` world`, + _seq: 2, + }) + await new Promise((r) => setTimeout(r, 100)) + expect(data().deltas).toHaveLength(2) + }) + + it(`second insert propagates with multiple sibling toArray includes`, async () => { + type Seed = { key: string } + type Text = { key: string; seedKey: string; status: string } + type TextDelta = { + key: string + textId: string + seedKey: string + delta: string + seq: number + } + + const seeds = createCollection( + localOnlyCollectionOptions({ + id: `bug2d-seeds`, + getKey: (s) => s.key, + initialData: [{ key: `seed-1` }], + }), + ) + + const texts = createCollection( + localOnlyCollectionOptions({ + id: `bug2d-texts`, + getKey: (t) => t.key, + initialData: [], + }), + ) + + const textDeltas = createCollection( + localOnlyCollectionOptions({ + id: `bug2d-textDeltas`, + getKey: (td) => td.key, + initialData: [], + }), + ) + + const collection = createLiveQueryCollection((q) => + q.from({ s: seeds }).select(({ s }) => ({ + key: s.key, + texts: toArray( + q + .from({ t: texts }) + .where(({ t }) => eq(t.seedKey, s.key)) + .select(({ t }) => ({ + key: t.key, + status: t.status, + })), + ), + textDeltas: toArray( + q + .from({ td: textDeltas }) + .where(({ td }) => eq(td.seedKey, s.key)) + .orderBy(({ td }) => td.seq) + .select(({ td }) => ({ + key: td.key, + textId: td.textId, + delta: td.delta, + })), + ), + })), + ) + + await collection.preload() + + const data = () => collection.get(`seed-1`) as any + + texts.insert({ key: `text-1`, seedKey: `seed-1`, status: `streaming` }) + await new Promise((r) => setTimeout(r, 50)) + expect(data().texts).toHaveLength(1) + + textDeltas.insert({ + key: `td-1`, + textId: `text-1`, + seedKey: `seed-1`, + delta: `Hello`, + seq: 1, + }) + await new Promise((r) => setTimeout(r, 50)) + expect(data().textDeltas).toHaveLength(1) + expect(data().textDeltas[0].delta).toBe(`Hello`) + + textDeltas.insert({ + key: `td-2`, + textId: `text-1`, + seedKey: `seed-1`, + delta: ` world`, + seq: 2, + }) + await new Promise((r) => setTimeout(r, 50)) + expect(data().textDeltas).toHaveLength(2) + }) + }) + + describe(`nested toArray includes (depth 3+)`, () => { + it(`control: flat concat(toArray) propagates delta inserts`, async () => { + type Text = { key: string; _seq: number; status: string } + type TextDelta = { + key: string + text_id: string + _seq: number + delta: string + } + + const texts = createCollection( + localOnlyCollectionOptions({ + id: `nested-ctrl-texts`, + getKey: (t) => t.key, + initialData: [], + }), + ) + + const textDeltas = createCollection( + localOnlyCollectionOptions({ + id: `nested-ctrl-deltas`, + getKey: (d) => d.key, + initialData: [], + }), + ) + + const collection = createLiveQueryCollection({ + id: `nested-ctrl-live`, + query: (q) => + q.from({ text: texts }).select(({ text }) => ({ + key: text.key, + order: coalesce(text._seq, -1), + status: text.status, + text: concat( + toArray( + q + .from({ delta: textDeltas }) + .where(({ delta }) => eq(delta.text_id, text.key)) + .orderBy(({ delta }) => delta._seq) + .select(({ delta }) => delta.delta), + ), + ), + })), + }) + + await collection.preload() + + texts.insert({ key: `text-1`, _seq: 1, status: `streaming` }) + await new Promise((r) => setTimeout(r, 50)) + expect((collection.get(`text-1`) as any)?.text).toBe(``) + + textDeltas.insert({ + key: `td-1`, + text_id: `text-1`, + _seq: 2, + delta: `Hello`, + }) + await new Promise((r) => setTimeout(r, 50)) + expect((collection.get(`text-1`) as any)?.text).toBe(`Hello`) + + textDeltas.insert({ + key: `td-2`, + text_id: `text-1`, + _seq: 3, + delta: ` world`, + }) + await new Promise((r) => setTimeout(r, 50)) + expect((collection.get(`text-1`) as any)?.text).toBe(`Hello world`) + }) + + it(`nested toArray(runs) -> toArray(texts) -> concat(toArray(textDeltas)) propagates`, async () => { + const TIMELINE_KEY = `tl-nested` + + type Seed = { key: string } + type Run = { key: string; _seq: number; status: string } + type Text = { + key: string + run_id: string + _seq: number + status: string + } + type TextDelta = { + key: string + text_id: string + run_id: string + _seq: number + delta: string + } + + const seed = createCollection( + localOnlyCollectionOptions({ + id: `nested-seed`, + getKey: (s) => s.key, + initialData: [{ key: TIMELINE_KEY }], + }), + ) + + const runs = createCollection( + localOnlyCollectionOptions({ + id: `nested-runs`, + getKey: (r) => r.key, + initialData: [], + }), + ) + + const texts = createCollection( + localOnlyCollectionOptions({ + id: `nested-texts`, + getKey: (t) => t.key, + initialData: [], + }), + ) + + const textDeltas = createCollection( + localOnlyCollectionOptions({ + id: `nested-deltas`, + getKey: (d) => d.key, + initialData: [], + }), + ) + + const runsLive = createLiveQueryCollection({ + id: `nested-runs-live`, + query: (q) => + q.from({ run: runs }).select(({ run }) => ({ + timelineKey: TIMELINE_KEY, + key: run.key, + order: coalesce(run._seq, -1), + status: run.status, + })), + }) + + const textsLive = createLiveQueryCollection({ + id: `nested-texts-live`, + query: (q) => + q.from({ text: texts }).select(({ text }) => ({ + timelineKey: TIMELINE_KEY, + key: text.key, + run_id: text.run_id, + order: coalesce(text._seq, -1), + status: text.status, + })), + }) + + const textDeltasLive = createLiveQueryCollection({ + id: `nested-deltas-live`, + query: (q) => + q.from({ delta: textDeltas }).select(({ delta }) => ({ + timelineKey: TIMELINE_KEY, + key: delta.key, + text_id: delta.text_id, + run_id: delta.run_id, + order: coalesce(delta._seq, -1), + delta: delta.delta, + })), + }) + + const timeline = createLiveQueryCollection({ + id: `nested-timeline`, + query: (q) => + q.from({ s: seed }).select(({ s }) => ({ + key: s.key, + runs: toArray( + q + .from({ run: runsLive }) + .where(({ run }) => eq(run.timelineKey, s.key)) + .orderBy(({ run }) => run.order) + .select(({ run }) => ({ + key: run.key, + order: run.order, + status: run.status, + texts: toArray( + q + .from({ text: textsLive }) + .where(({ text }) => eq(text.run_id, run.key)) + .orderBy(({ text }) => text.order) + .select(({ text }) => ({ + key: text.key, + run_id: text.run_id, + order: text.order, + status: text.status, + text: concat( + toArray( + q + .from({ delta: textDeltasLive }) + .where(({ delta }) => eq(delta.text_id, text.key)) + .orderBy(({ delta }) => delta.order) + .select(({ delta }) => delta.delta), + ), + ), + })), + ), + })), + ), + })), + }) + + await timeline.preload() + + const data = () => timeline.get(TIMELINE_KEY) as any + + runs.insert({ key: `run-1`, _seq: 1, status: `started` }) + texts.insert({ + key: `text-1`, + run_id: `run-1`, + _seq: 2, + status: `streaming`, + }) + await new Promise((r) => setTimeout(r, 100)) + + expect(data().runs).toHaveLength(1) + expect(data().runs[0].texts).toHaveLength(1) + expect(data().runs[0].texts[0].text).toBe(``) + + textDeltas.insert({ + key: `td-1`, + text_id: `text-1`, + run_id: `run-1`, + _seq: 3, + delta: `Hello`, + }) + await new Promise((r) => setTimeout(r, 100)) + expect(data().runs[0].texts[0].text).toBe(`Hello`) + + textDeltas.insert({ + key: `td-2`, + text_id: `text-1`, + run_id: `run-1`, + _seq: 4, + delta: ` world`, + }) + await new Promise((r) => setTimeout(r, 100)) + expect(data().runs[0].texts[0].text).toBe(`Hello world`) + }) + + it(`deep buffer change for one parent does not emit spurious update for sibling parent`, async () => { + const TIMELINE_KEY = `tl-spurious` + + type Seed = { key: string } + type Run = { key: string; _seq: number; status: string } + type Text = { + key: string + run_id: string + _seq: number + status: string + } + type TextDelta = { + key: string + text_id: string + run_id: string + _seq: number + delta: string + } + + const seed = createCollection( + localOnlyCollectionOptions({ + id: `spurious-seed`, + getKey: (s) => s.key, + initialData: [{ key: TIMELINE_KEY }], + }), + ) + + const runs = createCollection( + localOnlyCollectionOptions({ + id: `spurious-runs`, + getKey: (r) => r.key, + initialData: [], + }), + ) + + const texts = createCollection( + localOnlyCollectionOptions({ + id: `spurious-texts`, + getKey: (t) => t.key, + initialData: [], + }), + ) + + const textDeltas = createCollection( + localOnlyCollectionOptions({ + id: `spurious-deltas`, + getKey: (d) => d.key, + initialData: [], + }), + ) + + const runsLive = createLiveQueryCollection({ + id: `spurious-runs-live`, + query: (q) => + q.from({ run: runs }).select(({ run }) => ({ + timelineKey: TIMELINE_KEY, + key: run.key, + order: coalesce(run._seq, -1), + status: run.status, + })), + }) + + const textsLive = createLiveQueryCollection({ + id: `spurious-texts-live`, + query: (q) => + q.from({ text: texts }).select(({ text }) => ({ + timelineKey: TIMELINE_KEY, + key: text.key, + run_id: text.run_id, + order: coalesce(text._seq, -1), + status: text.status, + })), + }) + + const textDeltasLive = createLiveQueryCollection({ + id: `spurious-deltas-live`, + query: (q) => + q.from({ delta: textDeltas }).select(({ delta }) => ({ + timelineKey: TIMELINE_KEY, + key: delta.key, + text_id: delta.text_id, + run_id: delta.run_id, + order: coalesce(delta._seq, -1), + delta: delta.delta, + })), + }) + + const timeline = createLiveQueryCollection({ + id: `spurious-timeline`, + query: (q) => + q.from({ s: seed }).select(({ s }) => ({ + key: s.key, + runs: toArray( + q + .from({ run: runsLive }) + .where(({ run }) => eq(run.timelineKey, s.key)) + .orderBy(({ run }) => run.order) + .select(({ run }) => ({ + key: run.key, + order: run.order, + status: run.status, + texts: toArray( + q + .from({ text: textsLive }) + .where(({ text }) => eq(text.run_id, run.key)) + .orderBy(({ text }) => text.order) + .select(({ text }) => ({ + key: text.key, + run_id: text.run_id, + order: text.order, + status: text.status, + text: concat( + toArray( + q + .from({ delta: textDeltasLive }) + .where(({ delta }) => eq(delta.text_id, text.key)) + .orderBy(({ delta }) => delta.order) + .select(({ delta }) => delta.delta), + ), + ), + })), + ), + })), + ), + })), + }) + + await timeline.preload() + + const data = () => timeline.get(TIMELINE_KEY) as any + + runs.insert({ key: `run-1`, _seq: 1, status: `started` }) + runs.insert({ key: `run-2`, _seq: 2, status: `started` }) + texts.insert({ + key: `text-1`, + run_id: `run-1`, + _seq: 3, + status: `streaming`, + }) + texts.insert({ + key: `text-2`, + run_id: `run-2`, + _seq: 4, + status: `streaming`, + }) + await new Promise((r) => setTimeout(r, 100)) + + expect(data().runs).toHaveLength(2) + expect(data().runs[0].texts[0].text).toBe(``) + expect(data().runs[1].texts[0].text).toBe(``) + + const timelineRowBefore = data() + const run1TextsBefore = timelineRowBefore.runs[0].texts + const updateEvents: Array = [] + timeline.subscribeChanges((changes) => { + for (const change of changes) { + if (change.type === `update`) { + updateEvents.push(change) + } + } + }) + + textDeltas.insert({ + key: `td-1`, + text_id: `text-2`, + run_id: `run-2`, + _seq: 5, + delta: `Hello`, + }) + await new Promise((r) => setTimeout(r, 100)) + + expect(data().runs[1].texts[0].text).toBe(`Hello`) + expect(data().runs[0].texts[0].text).toBe(``) + + expect(data().runs[0].texts).toBe(run1TextsBefore) + }) + }) + + describe(`many sibling toArray includes with chained derived collections`, () => { + function createSyncCollection( + id: string, + getKey: (item: T) => string | number, + ) { + let syncBegin: () => void + let syncWrite: (msg: { type: string; value: T }) => void + let syncCommit: () => void + + const collection = createCollection({ + id, + getKey, + sync: { + sync: (params: any) => { + syncBegin = params.begin + syncWrite = params.write + syncCommit = params.commit + params.markReady() + return () => {} + }, + } as SyncConfig, + startSync: true, + gcTime: 0, + }) + + return { + collection, + insert(value: T) { + syncBegin!() + syncWrite!({ type: `insert`, value }) + syncCommit!() + }, + } + } + + const TIMELINE_KEY = `timeline` + + type RawItem = { key: string; _seq: number; [k: string]: unknown } + + function createDerivedCollection( + id: string, + source: ReturnType>[`collection`], + extraFields?: (d: any) => Record, + ) { + return createLiveQueryCollection({ + id: `${id}:derived`, + query: (q: any) => + q.from({ d: source }).select(({ d }: any) => ({ + timelineKey: TIMELINE_KEY, + key: d.key, + order: coalesce(d._seq, -1), + ...(extraFields ? extraFields(d) : {}), + })), + }) + } + + it(`second insert propagates with 5 sibling chained toArray includes`, async () => { + const runs = createSyncCollection(`raw-runs`, (r) => r.key) + const texts = createSyncCollection(`raw-texts`, (r) => r.key) + const textDeltas = createSyncCollection( + `raw-textDeltas`, + (r) => r.key, + ) + const toolCalls = createSyncCollection( + `raw-toolCalls`, + (r) => r.key, + ) + const steps = createSyncCollection(`raw-steps`, (r) => r.key) + + const derivedRuns = createDerivedCollection( + `runs`, + runs.collection, + (d) => ({ + status: d.status, + }), + ) + const derivedTexts = createDerivedCollection( + `texts`, + texts.collection, + (d) => ({ + run_id: d.run_id, + status: d.status, + }), + ) + const derivedTextDeltas = createDerivedCollection( + `textDeltas`, + textDeltas.collection, + (d) => ({ + text_id: d.text_id, + run_id: d.run_id, + delta: d.delta, + }), + ) + const derivedToolCalls = createDerivedCollection( + `toolCalls`, + toolCalls.collection, + (d) => ({ + run_id: d.run_id, + tool_name: d.tool_name, + }), + ) + const derivedSteps = createDerivedCollection( + `steps`, + steps.collection, + (d) => ({ + run_id: d.run_id, + step_number: d.step_number, + }), + ) + + const seeds = createCollection({ + id: `seed`, + getKey: (s: { key: string }) => s.key, + sync: { + sync: (params: any) => { + params.begin() + params.write({ type: `insert`, value: { key: TIMELINE_KEY } }) + params.commit() + params.markReady() + return () => {} + }, + } as SyncConfig<{ key: string }>, + startSync: true, + gcTime: 0, + }) + + const collection = createLiveQueryCollection({ + query: (q: any) => + q.from({ s: seeds }).select(({ s }: any) => ({ + key: s.key, + runs: toArray( + q + .from({ r: derivedRuns }) + .where(({ r }: any) => eq(r.timelineKey, s.key)) + .orderBy(({ r }: any) => r.order) + .select(({ r }: any) => ({ key: r.key, status: r.status })), + ), + texts: toArray( + q + .from({ t: derivedTexts }) + .where(({ t }: any) => eq(t.timelineKey, s.key)) + .orderBy(({ t }: any) => t.order) + .select(({ t }: any) => ({ + key: t.key, + run_id: t.run_id, + status: t.status, + })), + ), + textDeltas: toArray( + q + .from({ td: derivedTextDeltas }) + .where(({ td }: any) => eq(td.timelineKey, s.key)) + .orderBy(({ td }: any) => td.order) + .select(({ td }: any) => ({ + key: td.key, + text_id: td.text_id, + delta: td.delta, + })), + ), + toolCalls: toArray( + q + .from({ tc: derivedToolCalls }) + .where(({ tc }: any) => eq(tc.timelineKey, s.key)) + .orderBy(({ tc }: any) => tc.order) + .select(({ tc }: any) => ({ + key: tc.key, + tool_name: tc.tool_name, + })), + ), + steps: toArray( + q + .from({ st: derivedSteps }) + .where(({ st }: any) => eq(st.timelineKey, s.key)) + .orderBy(({ st }: any) => st.order) + .select(({ st }: any) => ({ + key: st.key, + step_number: st.step_number, + })), + ), + })), + }) + + await collection.preload() + + const data = () => collection.get(TIMELINE_KEY) + + runs.insert({ key: `run-1`, status: `started`, _seq: 1 }) + texts.insert({ + key: `text-1`, + run_id: `run-1`, + status: `streaming`, + _seq: 2, + }) + await new Promise((r) => setTimeout(r, 100)) + + expect(data().runs).toHaveLength(1) + expect(data().texts).toHaveLength(1) + expect(data().textDeltas).toHaveLength(0) + + textDeltas.insert({ + key: `td-1`, + text_id: `text-1`, + run_id: `run-1`, + delta: `Hello`, + _seq: 3, + }) + await new Promise((r) => setTimeout(r, 100)) + expect(data().textDeltas).toHaveLength(1) + expect(data().textDeltas[0].delta).toBe(`Hello`) + + textDeltas.insert({ + key: `td-2`, + text_id: `text-1`, + run_id: `run-1`, + delta: ` world`, + _seq: 4, + }) + await new Promise((r) => setTimeout(r, 100)) + expect(data().textDeltas).toHaveLength(2) + }) + }) })