diff --git a/packages/opencode/src/cli/cmd/tui/context/sync.tsx b/packages/opencode/src/cli/cmd/tui/context/sync.tsx index 9f8a384f777f..4b4d01244ab8 100644 --- a/packages/opencode/src/cli/cmd/tui/context/sync.tsx +++ b/packages/opencode/src/cli/cmd/tui/context/sync.tsx @@ -113,6 +113,14 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({ const kv = useKV() const fullSyncedSessions = new Set() + const syncingSessions = new Map>() + const hydratingSessions = new Map; parts: Set }>() + const touchMessage = (sessionID: string, messageID: string) => { + hydratingSessions.get(sessionID)?.messages.add(messageID) + } + const touchPart = (sessionID: string, partID: string) => { + hydratingSessions.get(sessionID)?.parts.add(partID) + } function sessionListQuery(): { scope?: "project"; path?: string } { if (!kv.get("session_directory_filter_enabled", true)) return { scope: "project" } @@ -251,6 +259,7 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({ } case "message.updated": { + touchMessage(event.properties.info.sessionID, event.properties.info.id) const messages = store.message[event.properties.info.sessionID] if (!messages) { setStore("message", event.properties.info.sessionID, [event.properties.info]) @@ -290,6 +299,7 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({ break } case "message.removed": { + touchMessage(event.properties.sessionID, event.properties.messageID) const messages = store.message[event.properties.sessionID] const result = Binary.search(messages, event.properties.messageID, (m) => m.id) if (result.found) { @@ -304,6 +314,7 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({ break } case "message.part.updated": { + touchPart(event.properties.part.sessionID, event.properties.part.id) const parts = store.part[event.properties.part.messageID] if (!parts) { setStore("part", event.properties.part.messageID, [event.properties.part]) @@ -329,6 +340,7 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({ if (!parts) break const result = Binary.search(parts, event.properties.partID, (p) => p.id) if (!result.found) break + touchPart(event.properties.sessionID, event.properties.partID) setStore( "part", event.properties.messageID, @@ -343,6 +355,7 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({ } case "message.part.removed": { + touchPart(event.properties.sessionID, event.properties.partID) const parts = store.part[event.properties.messageID] const result = Binary.search(parts, event.properties.partID, (p) => p.id) if (result.found) { @@ -520,28 +533,76 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({ }, async sync(sessionID: string) { if (fullSyncedSessions.has(sessionID)) return - const [session, messages, todo, diff] = await Promise.all([ - sdk.client.session.get({ sessionID }, { throwOnError: true }), - sdk.client.session.messages({ sessionID, limit: 100 }), - sdk.client.session.todo({ sessionID }), - sdk.client.session.diff({ sessionID }), - ]) - setStore( - produce((draft) => { - const match = Binary.search(draft.session, sessionID, (s) => s.id) - if (match.found) draft.session[match.index] = session.data! - if (!match.found) draft.session.splice(match.index, 0, session.data!) - draft.todo[sessionID] = todo.data ?? [] - const infos: (typeof draft.message)[string] = [] - for (const message of messages.data ?? []) { - infos.push(message.info) - draft.part[message.info.id] = message.parts - } - draft.message[sessionID] = infos - draft.session_diff[sessionID] = diff.data ?? [] - }), - ) - fullSyncedSessions.add(sessionID) + const syncing = syncingSessions.get(sessionID) + if (syncing) return syncing + const tracker = { messages: new Set(), parts: new Set() } + hydratingSessions.set(sessionID, tracker) + const task = (async () => { + const [session, messages, todo, diff] = await Promise.all([ + sdk.client.session.get({ sessionID }, { throwOnError: true }), + sdk.client.session.messages({ sessionID, limit: 100 }), + sdk.client.session.todo({ sessionID }), + sdk.client.session.diff({ sessionID }), + ]) + setStore( + produce((draft) => { + const match = Binary.search(draft.session, sessionID, (s) => s.id) + if (match.found) draft.session[match.index] = session.data! + if (!match.found) draft.session.splice(match.index, 0, session.data!) + draft.todo[sessionID] = todo.data ?? [] + const currentMessages = draft.message[sessionID] ?? [] + const infos = (messages.data ?? []).flatMap((message) => { + if (!tracker.messages.has(message.info.id)) return [message.info] + const current = currentMessages.find((item) => item.id === message.info.id) + return current ? [current] : [] + }) + infos.push( + ...currentMessages.filter( + (message) => tracker.messages.has(message.id) && !infos.some((item) => item.id === message.id), + ), + ) + const removed = infos.slice(0, -100) + const visible = infos.slice(-100) + const visibleIDs = new Set(visible.map((message) => message.id)) + for (const message of messages.data ?? []) { + if (!visibleIDs.has(message.info.id)) { + delete draft.part[message.info.id] + continue + } + const currentParts = draft.part[message.info.id] ?? [] + const parts = message.parts.flatMap((part) => { + const current = currentParts.find((item) => item.id === part.id) + if (tracker.parts.has(part.id)) return current ? [current] : [] + if ( + current && + (part.type === "text" || part.type === "reasoning") && + (current.type === "text" || current.type === "reasoning") && + part.text.length === 0 && + current.text.length > 0 + ) { + return [current] + } + return [part] + }) + parts.push( + ...currentParts.filter( + (part) => tracker.parts.has(part.id) && !parts.some((item) => item.id === part.id), + ), + ) + draft.part[message.info.id] = parts + } + for (const message of removed) delete draft.part[message.id] + draft.message[sessionID] = visible + draft.session_diff[sessionID] = diff.data ?? [] + }), + ) + fullSyncedSessions.add(sessionID) + })().finally(() => { + syncingSessions.delete(sessionID) + hydratingSessions.delete(sessionID) + }) + syncingSessions.set(sessionID, task) + return task }, }, bootstrap, diff --git a/packages/opencode/test/cli/cmd/tui/sync-live-hydration.test.tsx b/packages/opencode/test/cli/cmd/tui/sync-live-hydration.test.tsx new file mode 100644 index 000000000000..21c5b9507140 --- /dev/null +++ b/packages/opencode/test/cli/cmd/tui/sync-live-hydration.test.tsx @@ -0,0 +1,278 @@ +/** @jsxImportSource @opentui/solid */ +import { expect, test } from "bun:test" +import { Global } from "@opencode-ai/core/global" +import type { GlobalEvent } from "@opencode-ai/sdk/v2" +import { tmpdir } from "../../../fixture/fixture" +import { json, mount, wait } from "./sync-fixture" + +const sessionID = "ses_hydration_race" +const messageID = "msg_hydration_race" +const partID = "prt_hydration_race" +const session = { + id: sessionID, + title: "race", + time: { created: 0, updated: 0 }, + version: "1.15.13", + directory: "/tmp/opencode/packages/opencode", +} +const assistant = { + id: messageID, + sessionID, + role: "assistant" as const, + agent: "build", + modelID: "model", + providerID: "test", + mode: "build", + parentID: "msg_user", + path: { cwd: session.directory, root: session.directory }, + cost: 0, + tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } }, + time: { created: 1, completed: 2 }, +} + +function global(payload: GlobalEvent["payload"]): GlobalEvent { + return { directory: "/tmp/other", project: "proj_test", payload } +} + +test("stale session hydration does not overwrite live message parts", async () => { + const previous = Global.Path.state + await using tmp = await tmpdir() + Global.Path.state = tmp.path + await Bun.write(`${tmp.path}/kv.json`, "{}") + + let resolveMessages!: (response: Response) => void + const messages = new Promise((resolve) => { + resolveMessages = resolve + }) + let requested = false + const { app, emit, sync } = await mount((url) => { + if (url.pathname === `/session/${sessionID}`) return json(session) + if (url.pathname === `/session/${sessionID}/message`) { + requested = true + return messages + } + if (url.pathname === `/session/${sessionID}/todo` || url.pathname === `/session/${sessionID}/diff`) return json([]) + return undefined + }) + + try { + const hydrate = sync.session.sync(sessionID) + await wait(() => requested) + emit(global({ id: "evt_message", type: "message.updated", properties: { sessionID, info: assistant } })) + emit( + global({ + id: "evt_part", + type: "message.part.updated", + properties: { + sessionID, + time: 2, + part: { id: partID, sessionID, messageID, type: "text", text: "visible live content" }, + }, + }), + ) + await wait(() => sync.data.part[messageID]?.[0]?.type === "text") + + resolveMessages( + json([ + { + info: assistant, + parts: [{ id: partID, sessionID, messageID, type: "text", text: "" }], + }, + ]), + ) + await hydrate + + expect(sync.data.part[messageID][0]).toMatchObject({ text: "visible live content" }) + } finally { + app.renderer.destroy() + Global.Path.state = previous + } +}) + +test("orphan live deltas do not suppress hydrated parts", async () => { + const previous = Global.Path.state + await using tmp = await tmpdir() + Global.Path.state = tmp.path + await Bun.write(`${tmp.path}/kv.json`, "{}") + + let resolveMessages!: (response: Response) => void + const messages = new Promise((resolve) => { + resolveMessages = resolve + }) + let requested = false + const { app, emit, sync } = await mount((url) => { + if (url.pathname === `/session/${sessionID}`) return json(session) + if (url.pathname === `/session/${sessionID}/message`) { + requested = true + return messages + } + if (url.pathname === `/session/${sessionID}/todo` || url.pathname === `/session/${sessionID}/diff`) return json([]) + return undefined + }) + + try { + const hydrate = sync.session.sync(sessionID) + await wait(() => requested) + emit( + global({ + id: "evt_delta", + type: "message.part.delta", + properties: { sessionID, messageID, partID, field: "text", delta: "ignored until part exists" }, + }), + ) + resolveMessages( + json([{ info: assistant, parts: [{ id: partID, sessionID, messageID, type: "text", text: "hydrated" }] }]), + ) + await hydrate + + expect(sync.data.part[messageID][0]).toMatchObject({ text: "hydrated" }) + } finally { + app.renderer.destroy() + Global.Path.state = previous + } +}) + +test("hydration does not clear text streamed before it starts", async () => { + const previous = Global.Path.state + await using tmp = await tmpdir() + Global.Path.state = tmp.path + await Bun.write(`${tmp.path}/kv.json`, "{}") + + let resolveMessages!: (response: Response) => void + const messages = new Promise((resolve) => { + resolveMessages = resolve + }) + let requested = false + const { app, emit, sync } = await mount((url) => { + if (url.pathname === `/session/${sessionID}`) return json(session) + if (url.pathname === `/session/${sessionID}/message`) { + requested = true + return messages + } + if (url.pathname === `/session/${sessionID}/todo` || url.pathname === `/session/${sessionID}/diff`) return json([]) + return undefined + }) + + try { + emit(global({ id: "evt_message", type: "message.updated", properties: { sessionID, info: assistant } })) + emit( + global({ + id: "evt_part", + type: "message.part.updated", + properties: { + sessionID, + time: 1, + part: { id: partID, sessionID, messageID, type: "text", text: "" }, + }, + }), + ) + emit( + global({ + id: "evt_delta", + type: "message.part.delta", + properties: { sessionID, messageID, partID, field: "text", delta: "visible streamed content" }, + }), + ) + await wait(() => sync.data.part[messageID]?.[0]?.type === "text" && sync.data.part[messageID][0].text !== "") + const hydrate = sync.session.sync(sessionID) + await wait(() => requested) + resolveMessages(json([{ info: assistant, parts: [{ id: partID, sessionID, messageID, type: "text", text: "" }] }])) + await hydrate + + expect(sync.data.part[messageID][0]).toMatchObject({ text: "visible streamed content" }) + } finally { + app.renderer.destroy() + Global.Path.state = previous + } +}) + +test("live messages merged during hydration retain the 100 message window", async () => { + const previous = Global.Path.state + await using tmp = await tmpdir() + Global.Path.state = tmp.path + await Bun.write(`${tmp.path}/kv.json`, "{}") + + let resolveMessages!: (response: Response) => void + const messages = new Promise((resolve) => { + resolveMessages = resolve + }) + let requested = false + const { app, emit, sync } = await mount((url) => { + if (url.pathname === `/session/${sessionID}`) return json(session) + if (url.pathname === `/session/${sessionID}/message`) { + requested = true + return messages + } + if (url.pathname === `/session/${sessionID}/todo` || url.pathname === `/session/${sessionID}/diff`) return json([]) + return undefined + }) + + try { + const hydrate = sync.session.sync(sessionID) + await wait(() => requested) + const live = { ...assistant, id: "msg_z_live" } + emit(global({ id: "evt_live", type: "message.updated", properties: { sessionID, info: live } })) + await wait(() => sync.data.message[sessionID]?.some((message) => message.id === live.id) ?? false) + resolveMessages( + json( + Array.from({ length: 100 }, (_, index) => { + const id = `msg_${String(index).padStart(3, "0")}` + return { + info: { ...assistant, id }, + parts: [{ id: `prt_${id}`, sessionID, messageID: id, type: "text", text: id }], + } + }), + ), + ) + await hydrate + + expect(sync.data.message[sessionID]).toHaveLength(100) + expect(sync.data.message[sessionID].at(-1)?.id).toBe(live.id) + expect(sync.data.message[sessionID].some((message) => message.id === "msg_000")).toBe(false) + expect(sync.data.part.msg_000).toBeUndefined() + } finally { + app.renderer.destroy() + Global.Path.state = previous + } +}) + +test("a message removed during hydration does not regain stale parts", async () => { + const previous = Global.Path.state + await using tmp = await tmpdir() + Global.Path.state = tmp.path + await Bun.write(`${tmp.path}/kv.json`, "{}") + + let resolveMessages!: (response: Response) => void + const messages = new Promise((resolve) => { + resolveMessages = resolve + }) + let requested = false + const { app, emit, sync } = await mount((url) => { + if (url.pathname === `/session/${sessionID}`) return json(session) + if (url.pathname === `/session/${sessionID}/message`) { + requested = true + return messages + } + if (url.pathname === `/session/${sessionID}/todo` || url.pathname === `/session/${sessionID}/diff`) return json([]) + return undefined + }) + + try { + emit(global({ id: "evt_message", type: "message.updated", properties: { sessionID, info: assistant } })) + await wait(() => sync.data.message[sessionID]?.length === 1) + const hydrate = sync.session.sync(sessionID) + await wait(() => requested) + emit(global({ id: "evt_removed", type: "message.removed", properties: { sessionID, messageID } })) + await wait(() => sync.data.message[sessionID]?.length === 0) + resolveMessages( + json([{ info: assistant, parts: [{ id: partID, sessionID, messageID, type: "text", text: "stale" }] }]), + ) + await hydrate + + expect(sync.data.message[sessionID]).toEqual([]) + expect(sync.data.part[messageID]).toBeUndefined() + } finally { + app.renderer.destroy() + Global.Path.state = previous + } +})