Skip to content

Commit cd630ab

Browse files
committed
feat(acp-next): add event routing
1 parent 245f00a commit cd630ab

5 files changed

Lines changed: 552 additions & 30 deletions

File tree

Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
import type { AgentSideConnection } from "@agentclientprotocol/sdk"
2+
import * as Log from "@opencode-ai/core/util/log"
3+
import type {
4+
Event,
5+
EventMessagePartDelta,
6+
EventMessagePartUpdated,
7+
OpencodeClient,
8+
Part,
9+
SessionMessageResponse,
10+
} from "@opencode-ai/sdk/v2"
11+
import { Effect } from "effect"
12+
import { ACPNextSession } from "./session"
13+
14+
const log = Log.create({ service: "acp-next-event" })
15+
16+
type Connection = Pick<AgentSideConnection, "sessionUpdate">
17+
type GlobalEventEnvelope = {
18+
payload?: Event
19+
}
20+
type GlobalEventStream = {
21+
stream: AsyncIterable<GlobalEventEnvelope>
22+
}
23+
24+
export function start(input: {
25+
sdk: OpencodeClient
26+
connection: Connection
27+
session: ACPNextSession.Interface
28+
}) {
29+
const subscription = new Subscription(input)
30+
subscription.start()
31+
return subscription
32+
}
33+
34+
export class Subscription {
35+
private readonly abort = new AbortController()
36+
private started = false
37+
38+
constructor(
39+
private readonly input: {
40+
sdk: OpencodeClient
41+
connection: Connection
42+
session: ACPNextSession.Interface
43+
},
44+
) {}
45+
46+
start() {
47+
if (this.started) return
48+
this.started = true
49+
this.run().catch((error: unknown) => {
50+
if (this.abort.signal.aborted) return
51+
log.error("event subscription failed", { error })
52+
})
53+
}
54+
55+
stop() {
56+
this.abort.abort()
57+
}
58+
59+
async handle(event: Event) {
60+
switch (event.type) {
61+
case "message.part.updated":
62+
return this.handlePartUpdated(event)
63+
case "message.part.delta":
64+
return this.handlePartDelta(event)
65+
}
66+
}
67+
68+
private async run() {
69+
while (!this.abort.signal.aborted) {
70+
const events = (await this.input.sdk.global.event({
71+
signal: this.abort.signal,
72+
})) as GlobalEventStream
73+
74+
for await (const event of events.stream) {
75+
if (this.abort.signal.aborted) return
76+
if (!event.payload) continue
77+
await this.handle(event.payload).catch((error: unknown) => {
78+
log.error("failed to handle event", { error, type: event.payload?.type })
79+
})
80+
}
81+
if (!this.abort.signal.aborted) await new Promise((resolve) => setTimeout(resolve, 1000))
82+
}
83+
}
84+
85+
private async handlePartUpdated(event: EventMessagePartUpdated) {
86+
const part = event.properties.part
87+
const sessionId = part.sessionID || event.properties.sessionID
88+
const session = await Effect.runPromise(this.input.session.tryGet(sessionId))
89+
if (!session) return
90+
91+
await Effect.runPromise(
92+
this.input.session.recordPartMetadata({
93+
sessionId: session.id,
94+
messageId: part.messageID,
95+
partId: part.id,
96+
partType: part.type,
97+
role: part.type === "reasoning" ? "assistant" : undefined,
98+
ignored: part.type === "text" ? part.ignored : undefined,
99+
toolCallId: part.type === "tool" ? part.callID : undefined,
100+
metadata: "metadata" in part ? part.metadata : undefined,
101+
}),
102+
)
103+
}
104+
105+
private async handlePartDelta(event: EventMessagePartDelta) {
106+
const props = event.properties
107+
const session = await Effect.runPromise(this.input.session.tryGet(props.sessionID))
108+
if (!session) return
109+
110+
const known = await Effect.runPromise(
111+
this.input.session.tryGetPartMetadata({
112+
sessionId: session.id,
113+
messageId: props.messageID,
114+
partId: props.partID,
115+
}),
116+
)
117+
const metadata =
118+
known?.role && known.partType
119+
? known
120+
: await this.fetchPartMetadata(session.id, session.cwd, props.messageID, props.partID)
121+
if (metadata?.role !== "assistant") return
122+
if (metadata.partType === "text" && props.field === "text" && metadata.ignored !== true) {
123+
await this.input.connection.sessionUpdate({
124+
sessionId: session.id,
125+
update: {
126+
sessionUpdate: "agent_message_chunk",
127+
messageId: props.messageID,
128+
content: {
129+
type: "text",
130+
text: props.delta,
131+
},
132+
},
133+
})
134+
return
135+
}
136+
137+
if (metadata.partType === "reasoning" && props.field === "text") {
138+
await this.input.connection.sessionUpdate({
139+
sessionId: session.id,
140+
update: {
141+
sessionUpdate: "agent_thought_chunk",
142+
messageId: props.messageID,
143+
content: {
144+
type: "text",
145+
text: props.delta,
146+
},
147+
},
148+
})
149+
}
150+
}
151+
152+
private async fetchPartMetadata(sessionId: string, cwd: string, messageId: string, partId: string) {
153+
const message = await this.input.sdk.session
154+
.message(
155+
{
156+
sessionID: sessionId,
157+
messageID: messageId,
158+
directory: cwd,
159+
},
160+
{ throwOnError: true },
161+
)
162+
.then((response) => response.data)
163+
.catch((error: unknown) => {
164+
log.error("unexpected error when fetching message for delta metadata", { error, messageId, partId })
165+
return undefined
166+
})
167+
if (!message) return
168+
169+
const part = message.parts.find((item) => item.id === partId)
170+
if (!part) return
171+
return await this.recordFetchedPart(sessionId, message, part)
172+
}
173+
174+
private async recordFetchedPart(sessionId: string, message: SessionMessageResponse, part: Part) {
175+
return await Effect.runPromise(
176+
this.input.session.recordPartMetadata({
177+
sessionId,
178+
messageId: part.messageID,
179+
partId: part.id,
180+
partType: part.type,
181+
role: message.info.role,
182+
ignored: part.type === "text" ? part.ignored : undefined,
183+
toolCallId: part.type === "tool" ? part.callID : undefined,
184+
metadata: "metadata" in part ? part.metadata : undefined,
185+
}),
186+
)
187+
}
188+
}
189+
190+
export * as ACPNextEvent from "./event"

packages/opencode/src/acp-next/service.ts

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,12 @@ import {
3131
} from "@agentclientprotocol/sdk"
3232
import { InstallationVersion } from "@opencode-ai/core/installation/version"
3333
import * as Log from "@opencode-ai/core/util/log"
34-
import type { OpencodeClient } from "@opencode-ai/sdk/v2"
34+
import type { Message, OpencodeClient } from "@opencode-ai/sdk/v2"
3535
import { Context, Effect, Layer, ManagedRuntime } from "effect"
3636
import * as ACPNextError from "./error"
3737
import { buildConfigOptions, parseModelSelection } from "./config-option"
3838
import { Directory } from "./directory"
39+
import { ACPNextEvent } from "./event"
3940
import { ACPNextSession } from "./session"
4041
import { ModelID, ProviderID } from "@/provider/schema"
4142
import { Provider } from "@/provider/provider"
@@ -71,10 +72,15 @@ export function make(input: {
7172
connection?: Pick<AgentSideConnection, "sessionUpdate">
7273
directory?: Directory.Interface
7374
session?: ACPNextSession.Interface
75+
eventSubscription?: (subscription: ACPNextEvent.Subscription) => void
7476
}): Interface {
7577
const session = input.session ?? makeSessionService()
7678
const directoryService = input.directory ?? makeDirectoryService(input.sdk)
7779
const registeredMcp = new Map<string, Set<string>>()
80+
if (input.connection) {
81+
const subscription = ACPNextEvent.start({ sdk: input.sdk, connection: input.connection, session })
82+
input.eventSubscription?.(subscription)
83+
}
7884

7985
const initialize = Effect.fn("ACPNext.initialize")(function* (params: InitializeRequest) {
8086
const authMethod: AuthMethod = {
@@ -476,17 +482,13 @@ type SdkResponse<T> = {
476482
}
477483

478484
type MessageInfo = {
479-
readonly role?: string
480-
readonly model?: {
481-
readonly providerID?: string
482-
readonly modelID?: string
483-
readonly variant?: string
484-
}
485-
readonly providerID?: string
486-
readonly modelID?: string
487-
readonly variant?: string
488-
readonly mode?: string
489-
readonly agent?: string
485+
readonly role?: Message["role"]
486+
readonly model?: Extract<Message, { role: "user" }>["model"]
487+
readonly providerID?: Extract<Message, { role: "assistant" }>["providerID"]
488+
readonly modelID?: Extract<Message, { role: "assistant" }>["modelID"]
489+
readonly variant?: Extract<Message, { role: "assistant" }>["variant"]
490+
readonly mode?: Extract<Message, { role: "assistant" }>["mode"]
491+
readonly agent?: Message["agent"]
490492
}
491493

492494
function request<T>(fn: () => Promise<T | SdkResponse<T>>, service?: string) {

packages/opencode/src/acp-next/session.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import type { McpServer } from "@agentclientprotocol/sdk"
2+
import type { Message, Part } from "@opencode-ai/sdk/v2"
23
import { Context, Effect, Layer, Ref } from "effect"
34
import type { ModelID, ProviderID } from "../provider/schema"
45
import * as ACPNextError from "./error"
@@ -11,6 +12,9 @@ export type SelectedModel = {
1112
export type KnownMessagePartMetadata = {
1213
messageId: string
1314
partId: string
15+
partType?: Part["type"]
16+
role?: Message["role"]
17+
ignored?: boolean
1418
toolCallId?: string
1519
metadata?: unknown
1620
}
@@ -40,6 +44,9 @@ export type RecordPartMetadataInput = {
4044
sessionId: string
4145
messageId: string
4246
partId: string
47+
partType?: Part["type"]
48+
role?: Message["role"]
49+
ignored?: boolean
4350
toolCallId?: string
4451
metadata?: unknown
4552
}
@@ -146,6 +153,9 @@ export const layer = Layer.effect(
146153
const metadata = {
147154
messageId: input.messageId,
148155
partId: input.partId,
156+
partType: input.partType,
157+
role: input.role,
158+
ignored: input.ignored,
149159
toolCallId: input.toolCallId,
150160
metadata: input.metadata,
151161
}

packages/opencode/src/acp-next/usage.ts

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import type { AgentSideConnection, Usage } from "@agentclientprotocol/sdk"
22
import * as Log from "@opencode-ai/core/util/log"
3+
import type { AssistantMessage as OpenCodeAssistantMessage, Message } from "@opencode-ai/sdk/v2"
34
import { InstanceRef } from "@/effect/instance-ref"
45
import { InstanceStore } from "@/project/instance-store"
56
import { ModelID, ProviderID } from "@/provider/schema"
@@ -8,27 +9,14 @@ import { Context, Effect, Layer, SynchronizedRef } from "effect"
89

910
const log = Log.create({ service: "acp-next-usage" })
1011

11-
export type AssistantTokenCost = {
12-
readonly cost: number
13-
readonly tokens: {
14-
readonly input: number
15-
readonly output: number
16-
readonly reasoning: number
17-
readonly cache: {
18-
readonly read: number
19-
readonly write: number
20-
}
21-
}
22-
}
12+
export type AssistantTokenCost = Pick<OpenCodeAssistantMessage, "cost" | "tokens">
2313

24-
export type AssistantMessage = AssistantTokenCost & {
25-
readonly role: "assistant"
26-
readonly providerID?: string
27-
readonly modelID?: string
28-
}
14+
export type AssistantMessage = AssistantTokenCost &
15+
Pick<OpenCodeAssistantMessage, "role"> &
16+
Partial<Pick<OpenCodeAssistantMessage, "providerID" | "modelID">>
2917

3018
export type SessionMessage = {
31-
readonly info: { readonly role: string } | AssistantMessage
19+
readonly info: { readonly role: Message["role"] } | AssistantMessage
3220
}
3321

3422
export type MessagesInput = {

0 commit comments

Comments
 (0)