Skip to content

Commit 70d8ee3

Browse files
committed
fix(acp): replay acp-next history sequentially
1 parent b0f7068 commit 70d8ee3

2 files changed

Lines changed: 91 additions & 6 deletions

File tree

packages/opencode/src/acp-next/service.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -475,9 +475,12 @@ function makeDirectoryService(sdk: OpencodeClient) {
475475

476476
function replayMessages(subscription: ACPNextEvent.Subscription | undefined, messages: SessionMessageResponse[]) {
477477
if (!subscription) return Effect.void
478-
return Effect.tryPromise({
479-
try: () => Promise.all(messages.map((message) => subscription.replayMessage(message))).then(() => undefined),
480-
catch: (error) => fromUnknownError(error, "event"),
478+
return Effect.promise(async () => {
479+
for (const message of messages) {
480+
await subscription.replayMessage(message).catch((error: unknown) => {
481+
log.error("failed to replay ACP message", { error, messageID: message.info.id })
482+
})
483+
}
481484
})
482485
}
483486

packages/opencode/test/acp-next/event.test.ts

Lines changed: 85 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ import { Directory } from "@/acp-next/directory"
88
import { ACPNextSession } from "@/acp-next/session"
99

1010
type SessionUpdateParams = Parameters<AgentSideConnection["sessionUpdate"]>[0]
11+
type ToolSessionUpdateParams = SessionUpdateParams & {
12+
update: Extract<SessionUpdateParams["update"], { sessionUpdate: "tool_call" | "tool_call_update" }>
13+
}
1114
type GlobalEventEnvelope = {
1215
payload?: Event
1316
}
@@ -287,9 +290,9 @@ function errorTool(sessionID: string, callID: string) {
287290
}
288291

289292
function toolUpdates(updates: SessionUpdateParams[]) {
290-
return updates.filter(
291-
(item) => item.update.sessionUpdate === "tool_call" || item.update.sessionUpdate === "tool_call_update",
292-
)
293+
return updates.filter((item): item is ToolSessionUpdateParams => {
294+
return item.update.sessionUpdate === "tool_call" || item.update.sessionUpdate === "tool_call_update"
295+
})
293296
}
294297

295298
async function createKnownSession(
@@ -414,6 +417,85 @@ describe("acp-next event routing", () => {
414417
expect(harness.updates).toHaveLength(2)
415418
})
416419

420+
it("replays loaded session messages sequentially and continues after update failures", async () => {
421+
const events = createEventStream()
422+
const updates: SessionUpdateParams[] = []
423+
const connection = {
424+
sessionUpdate: (params: SessionUpdateParams) => {
425+
if (params.update.sessionUpdate === "tool_call" && params.update.toolCallId === "call_slow") {
426+
return new Promise<void>((resolve) => {
427+
setTimeout(() => {
428+
updates.push(params)
429+
resolve()
430+
}, 20)
431+
})
432+
}
433+
434+
if (params.update.sessionUpdate === "tool_call_update" && params.update.toolCallId === "call_slow") {
435+
return Promise.reject(new Error("replay send failed"))
436+
}
437+
438+
updates.push(params)
439+
return Promise.resolve()
440+
},
441+
} satisfies Pick<AgentSideConnection, "sessionUpdate">
442+
let subscription: ACPNextEvent.Subscription | undefined
443+
const service = ACPNextService.make({
444+
sdk: {
445+
global: {
446+
event: (options?: { signal?: AbortSignal }) => Promise.resolve({ stream: events.stream(options?.signal) }),
447+
},
448+
session: {
449+
get: () => Promise.resolve({ data: { id: "ses_loaded" } }),
450+
messages: () =>
451+
Promise.resolve({
452+
data: [
453+
assistantToolMessage(completedTool("ses_loaded", "call_slow", "slow")),
454+
assistantToolMessage(completedTool("ses_loaded", "call_after", "after")),
455+
],
456+
}),
457+
},
458+
} as unknown as OpencodeClient,
459+
connection,
460+
directory: {
461+
get: () =>
462+
Effect.succeed(
463+
Directory.build({
464+
directory: "/workspace",
465+
providers: {},
466+
modes: [],
467+
defaultModeID: "build",
468+
commands: [],
469+
}),
470+
),
471+
refresh: () =>
472+
Effect.succeed(
473+
Directory.build({
474+
directory: "/workspace",
475+
providers: {},
476+
modes: [],
477+
defaultModeID: "build",
478+
commands: [],
479+
}),
480+
),
481+
variants: Directory.variants,
482+
},
483+
eventSubscription: (started) => {
484+
subscription = started
485+
},
486+
})
487+
488+
await Effect.runPromise(service.loadSession({ cwd: "/workspace", sessionId: "ses_loaded", mcpServers: [] }))
489+
490+
expect(toolUpdates(updates).map((item) => item.update.toolCallId)).toEqual([
491+
"call_slow",
492+
"call_after",
493+
"call_after",
494+
])
495+
subscription?.stop()
496+
events.close()
497+
})
498+
417499
it("ignores unknown sessions and live user parts without user_message_chunk duplication", async () => {
418500
const harness = createHarness()
419501
await createKnownSession(harness.session, "ses_user", {

0 commit comments

Comments
 (0)