Skip to content

Commit b0f7068

Browse files
committed
feat(acp): stream acp-next tool updates
1 parent 8845a43 commit b0f7068

4 files changed

Lines changed: 506 additions & 7 deletions

File tree

packages/opencode/src/acp-next/event.ts

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,18 @@ import type {
77
OpencodeClient,
88
Part,
99
SessionMessageResponse,
10+
ToolPart,
1011
} from "@opencode-ai/sdk/v2"
1112
import { Effect } from "effect"
1213
import { ACPNextSession } from "./session"
14+
import {
15+
duplicateRunningToolUpdate,
16+
errorToolUpdate,
17+
pendingToolCall,
18+
runningToolUpdate,
19+
shellOutputSnapshot,
20+
completedToolUpdate,
21+
} from "./tool"
1322

1423
const log = Log.create({ service: "acp-next-event" })
1524

@@ -29,6 +38,8 @@ export function start(input: { sdk: OpencodeClient; connection: Connection; sess
2938

3039
export class Subscription {
3140
private readonly abort = new AbortController()
41+
private readonly shellSnapshots = new Map<string, string>()
42+
private readonly toolStarts = new Set<string>()
3243
private started = false
3344

3445
constructor(
@@ -61,6 +72,17 @@ export class Subscription {
6172
}
6273
}
6374

75+
async replayMessage(message: SessionMessageResponse) {
76+
if (message.info.role !== "assistant" && message.info.role !== "user") return
77+
78+
for (const part of message.parts) {
79+
await this.recordFetchedPart(message.info.sessionID, message, part)
80+
if (part.type === "tool") {
81+
await this.handleToolPart(message.info.sessionID, part)
82+
}
83+
}
84+
}
85+
6486
private async run() {
6587
while (!this.abort.signal.aborted) {
6688
const events = (await this.input.sdk.global.event({
@@ -96,6 +118,9 @@ export class Subscription {
96118
metadata: "metadata" in part ? part.metadata : undefined,
97119
}),
98120
)
121+
if (part.type === "tool") {
122+
await this.handleToolPart(session.id, part)
123+
}
99124
}
100125

101126
private async handlePartDelta(event: EventMessagePartDelta) {
@@ -181,6 +206,106 @@ export class Subscription {
181206
}),
182207
)
183208
}
209+
210+
private async handleToolPart(sessionId: string, part: ToolPart) {
211+
await this.toolStart(sessionId, part)
212+
213+
switch (part.state.status) {
214+
case "pending":
215+
this.shellSnapshots.delete(part.callID)
216+
return
217+
218+
case "running":
219+
await this.runningTool(sessionId, part)
220+
return
221+
222+
case "completed":
223+
this.clearTool(part.callID)
224+
await this.input.connection.sessionUpdate({
225+
sessionId,
226+
update: {
227+
sessionUpdate: "tool_call_update",
228+
...completedToolUpdate({
229+
toolCallId: part.callID,
230+
toolName: part.tool,
231+
state: part.state,
232+
}),
233+
},
234+
})
235+
return
236+
237+
case "error":
238+
this.clearTool(part.callID)
239+
await this.input.connection.sessionUpdate({
240+
sessionId,
241+
update: {
242+
sessionUpdate: "tool_call_update",
243+
...errorToolUpdate({
244+
toolCallId: part.callID,
245+
toolName: part.tool,
246+
state: part.state,
247+
}),
248+
},
249+
})
250+
return
251+
}
252+
}
253+
254+
private async runningTool(sessionId: string, part: ToolPart) {
255+
if (part.state.status !== "running") return
256+
257+
const output = part.tool === "bash" ? shellOutputSnapshot(part.state) : undefined
258+
if (output !== undefined) {
259+
if (this.shellSnapshots.get(part.callID) === output) {
260+
await this.input.connection.sessionUpdate({
261+
sessionId,
262+
update: {
263+
sessionUpdate: "tool_call_update",
264+
...duplicateRunningToolUpdate({
265+
toolCallId: part.callID,
266+
toolName: part.tool,
267+
state: part.state,
268+
}),
269+
},
270+
})
271+
return
272+
}
273+
this.shellSnapshots.set(part.callID, output)
274+
}
275+
276+
await this.input.connection.sessionUpdate({
277+
sessionId,
278+
update: {
279+
sessionUpdate: "tool_call_update",
280+
...runningToolUpdate({
281+
toolCallId: part.callID,
282+
toolName: part.tool,
283+
state: part.state,
284+
output,
285+
}),
286+
},
287+
})
288+
}
289+
290+
private async toolStart(sessionId: string, part: ToolPart) {
291+
if (this.toolStarts.has(part.callID)) return
292+
this.toolStarts.add(part.callID)
293+
await this.input.connection.sessionUpdate({
294+
sessionId,
295+
update: {
296+
sessionUpdate: "tool_call",
297+
...pendingToolCall({
298+
toolCallId: part.callID,
299+
toolName: part.tool,
300+
}),
301+
},
302+
})
303+
}
304+
305+
private clearTool(toolCallId: string) {
306+
this.toolStarts.delete(toolCallId)
307+
this.shellSnapshots.delete(toolCallId)
308+
}
184309
}
185310

186311
export * as ACPNextEvent from "./event"

packages/opencode/src/acp-next/service.ts

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import {
3131
} from "@agentclientprotocol/sdk"
3232
import { InstallationVersion } from "@opencode-ai/core/installation/version"
3333
import * as Log from "@opencode-ai/core/util/log"
34-
import type { Message, OpencodeClient } from "@opencode-ai/sdk/v2"
34+
import type { Message, OpencodeClient, SessionMessageResponse } from "@opencode-ai/sdk/v2"
3535
import { Context, Effect, Layer, ManagedRuntime } from "effect"
3636
import * as ACPNextError from "./error"
3737
import { buildConfigOptions, parseModelSelection } from "./config-option"
@@ -77,10 +77,10 @@ export function make(input: {
7777
const session = input.session ?? makeSessionService()
7878
const directoryService = input.directory ?? makeDirectoryService(input.sdk)
7979
const registeredMcp = new Map<string, Set<string>>()
80-
if (input.connection) {
81-
const subscription = ACPNextEvent.start({ sdk: input.sdk, connection: input.connection, session })
82-
input.eventSubscription?.(subscription)
83-
}
80+
const events = input.connection
81+
? ACPNextEvent.start({ sdk: input.sdk, connection: input.connection, session })
82+
: undefined
83+
if (events) input.eventSubscription?.(events)
8484

8585
const initialize = Effect.fn("ACPNext.initialize")(function* (params: InitializeRequest) {
8686
const authMethod: AuthMethod = {
@@ -207,6 +207,7 @@ export function make(input: {
207207

208208
yield* registerMcpServers(input.sdk, registeredMcp, params.cwd, state.id, params.mcpServers)
209209
yield* sendAvailableCommands(input.connection, state.id, snapshot)
210+
yield* replayMessages(events, messages)
210211

211212
return {
212213
configOptions: configOptions(snapshot, {
@@ -276,6 +277,7 @@ export function make(input: {
276277

277278
yield* registerMcpServers(input.sdk, registeredMcp, params.cwd, state.id, params.mcpServers ?? [])
278279
yield* sendAvailableCommands(input.connection, state.id, snapshot)
280+
yield* replayMessages(events, messages)
279281

280282
return {
281283
configOptions: configOptions(snapshot, {
@@ -335,6 +337,7 @@ export function make(input: {
335337

336338
yield* registerMcpServers(input.sdk, registeredMcp, params.cwd, state.id, params.mcpServers ?? [])
337339
yield* sendAvailableCommands(input.connection, state.id, snapshot)
340+
yield* replayMessages(events, messages)
338341

339342
return {
340343
sessionId: state.id,
@@ -470,6 +473,14 @@ function makeDirectoryService(sdk: OpencodeClient) {
470473
).runSync(Directory.Service.use((service) => Effect.succeed(service)))
471474
}
472475

476+
function replayMessages(subscription: ACPNextEvent.Subscription | undefined, messages: SessionMessageResponse[]) {
477+
if (!subscription) return Effect.void
478+
return Effect.tryPromise({
479+
try: () => Promise.all(messages.map((message) => subscription.replayMessage(message))).then(() => undefined),
480+
catch: (error) => fromUnknownError(error, "event"),
481+
})
482+
}
483+
473484
type ConfigState = {
474485
readonly model: Directory.DefaultModel
475486
readonly variant?: string

packages/opencode/src/acp-next/tool.ts

Lines changed: 117 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import type { ToolCallContent, ToolCallLocation, ToolKind } from "@agentclientprotocol/sdk"
1+
import type { ToolCall, ToolCallContent, ToolCallLocation, ToolCallUpdate, ToolKind } from "@agentclientprotocol/sdk"
22

33
export type ToolInput = Record<string, unknown>
44

@@ -16,6 +16,19 @@ export type CompletedToolState = {
1616
readonly attachments?: ReadonlyArray<ToolAttachment>
1717
}
1818

19+
export type RunningToolState = {
20+
readonly status: "running"
21+
readonly input: ToolInput
22+
readonly title?: string
23+
}
24+
25+
export type ErrorToolState = {
26+
readonly status: "error"
27+
readonly input: ToolInput
28+
readonly error: string
29+
readonly metadata?: unknown
30+
}
31+
1932
export type ImageAttachment = {
2033
readonly mimeType: string
2134
readonly data: string
@@ -100,6 +113,104 @@ export function completedToolContent(toolName: string, state: CompletedToolState
100113
return content
101114
}
102115

116+
export function pendingToolCall(input: { readonly toolCallId: string; readonly toolName: string }): ToolCall {
117+
return {
118+
toolCallId: input.toolCallId,
119+
title: input.toolName,
120+
kind: toToolKind(input.toolName),
121+
status: "pending",
122+
locations: [],
123+
rawInput: {},
124+
}
125+
}
126+
127+
export function runningToolUpdate(input: {
128+
readonly toolCallId: string
129+
readonly toolName: string
130+
readonly state: RunningToolState
131+
readonly output?: string
132+
}): ToolCallUpdate {
133+
const content = input.output
134+
? [
135+
{
136+
type: "content" as const,
137+
content: {
138+
type: "text" as const,
139+
text: input.output,
140+
},
141+
},
142+
]
143+
: undefined
144+
145+
return {
146+
toolCallId: input.toolCallId,
147+
status: "in_progress",
148+
kind: toToolKind(input.toolName),
149+
title: input.state.title ?? input.toolName,
150+
locations: toLocations(input.toolName, input.state.input),
151+
rawInput: input.state.input,
152+
...(content ? { content } : {}),
153+
}
154+
}
155+
156+
export function duplicateRunningToolUpdate(input: {
157+
readonly toolCallId: string
158+
readonly toolName: string
159+
readonly state: RunningToolState
160+
}): ToolCallUpdate {
161+
return {
162+
toolCallId: input.toolCallId,
163+
status: "in_progress",
164+
kind: toToolKind(input.toolName),
165+
title: input.state.title ?? input.toolName,
166+
locations: toLocations(input.toolName, input.state.input),
167+
rawInput: input.state.input,
168+
}
169+
}
170+
171+
export function completedToolUpdate(input: {
172+
readonly toolCallId: string
173+
readonly toolName: string
174+
readonly state: CompletedToolState & { readonly title: string }
175+
}): ToolCallUpdate {
176+
return {
177+
toolCallId: input.toolCallId,
178+
status: "completed",
179+
kind: toToolKind(input.toolName),
180+
title: input.state.title,
181+
content: completedToolContent(input.toolName, input.state),
182+
rawInput: input.state.input,
183+
rawOutput: completedToolRawOutput(input.state),
184+
}
185+
}
186+
187+
export function errorToolUpdate(input: {
188+
readonly toolCallId: string
189+
readonly toolName: string
190+
readonly state: ErrorToolState
191+
}): ToolCallUpdate {
192+
return {
193+
toolCallId: input.toolCallId,
194+
status: "failed",
195+
kind: toToolKind(input.toolName),
196+
title: input.toolName,
197+
rawInput: input.state.input,
198+
content: [
199+
{
200+
type: "content",
201+
content: {
202+
type: "text",
203+
text: input.state.error,
204+
},
205+
},
206+
],
207+
rawOutput: {
208+
error: input.state.error,
209+
metadata: input.state.metadata,
210+
},
211+
}
212+
}
213+
103214
export function completedToolRawOutput(state: CompletedToolState) {
104215
return {
105216
output: state.output,
@@ -138,6 +249,11 @@ export const extractLocations = toLocations
138249
export const buildCompletedToolContent = completedToolContent
139250
export const buildCompletedRawOutput = completedToolRawOutput
140251
export const extractShellOutputSnapshot = shellOutputSnapshot
252+
export const buildPendingToolCall = pendingToolCall
253+
export const buildRunningToolUpdate = runningToolUpdate
254+
export const buildDuplicateRunningToolUpdate = duplicateRunningToolUpdate
255+
export const buildCompletedToolUpdate = completedToolUpdate
256+
export const buildErrorToolUpdate = errorToolUpdate
141257

142258
function locationFrom(value: unknown): ToolCallLocation[] {
143259
const path = stringValue(value)

0 commit comments

Comments
 (0)