Skip to content

Commit e76cf96

Browse files
authored
fix(session): finalize interrupted assistant messages (#27254)
1 parent c2723b5 commit e76cf96

2 files changed

Lines changed: 139 additions & 12 deletions

File tree

packages/opencode/src/session/prompt.ts

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1755,12 +1755,25 @@ NOTE: At any point in time through this workflow you should feel free to ask the
17551755
sessionID,
17561756
}
17571757
yield* sessions.updateMessage(msg)
1758-
const handle = yield* processor.create({
1759-
assistantMessage: msg,
1760-
sessionID,
1761-
model,
1758+
1759+
const finalizeInterruptedAssistant = Effect.gen(function* () {
1760+
if (msg.time.completed) return
1761+
msg.error ??= MessageV2.fromError(new DOMException("Aborted", "AbortError"), {
1762+
providerID: msg.providerID,
1763+
aborted: true,
1764+
})
1765+
msg.time.completed = Date.now()
1766+
yield* sessions.updateMessage(msg)
17621767
})
17631768

1769+
const handle = yield* processor
1770+
.create({
1771+
assistantMessage: msg,
1772+
sessionID,
1773+
model,
1774+
})
1775+
.pipe(Effect.onInterrupt(() => finalizeInterruptedAssistant))
1776+
17641777
const outcome: "break" | "continue" = yield* Effect.gen(function* () {
17651778
const lastUserMsg = msgs.findLast((m) => m.info.role === "user")
17661779
const bypassAgentCheck = lastUserMsg?.parts.some((p) => p.type === "agent") ?? false
@@ -1859,7 +1872,10 @@ NOTE: At any point in time through this workflow you should feel free to ask the
18591872
})
18601873
}
18611874
return "continue" as const
1862-
}).pipe(Effect.ensuring(instruction.clear(handle.message.id)))
1875+
}).pipe(
1876+
Effect.ensuring(instruction.clear(handle.message.id)),
1877+
Effect.onInterrupt(() => finalizeInterruptedAssistant),
1878+
)
18631879
if (outcome === "break") break
18641880
continue
18651881
}

packages/opencode/test/session/prompt.test.ts

Lines changed: 118 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,16 @@ const lsp = Layer.succeed(
152152
const status = SessionStatus.layer.pipe(Layer.provideMerge(Bus.layer))
153153
const run = SessionRunState.layer.pipe(Layer.provide(status))
154154
const infra = Layer.mergeAll(NodeFileSystem.layer, CrossSpawnSpawner.defaultLayer)
155-
function makeHttp() {
155+
156+
const processorCreateStarted: Array<() => void> = []
157+
const blockingProcessor = Layer.succeed(
158+
SessionProcessor.Service,
159+
SessionProcessor.Service.of({
160+
create: () => Effect.sync(() => processorCreateStarted.shift()?.()).pipe(Effect.andThen(Effect.never)),
161+
}),
162+
)
163+
164+
function makeHttp(input?: { processor?: "blocking" }) {
156165
const deps = Layer.mergeAll(
157166
Session.defaultLayer,
158167
Snapshot.defaultLayer,
@@ -186,12 +195,15 @@ function makeHttp() {
186195
Layer.provideMerge(deps),
187196
)
188197
const trunc = Truncate.layer.pipe(Layer.provideMerge(deps))
189-
const proc = SessionProcessor.layer.pipe(
190-
Layer.provide(summary),
191-
Layer.provide(Image.defaultLayer),
192-
Layer.provide(RuntimeFlags.layer({ experimentalEventSystem: true })),
193-
Layer.provideMerge(deps),
194-
)
198+
const proc =
199+
input?.processor === "blocking"
200+
? blockingProcessor
201+
: SessionProcessor.layer.pipe(
202+
Layer.provide(summary),
203+
Layer.provide(Image.defaultLayer),
204+
Layer.provide(RuntimeFlags.layer({ experimentalEventSystem: true })),
205+
Layer.provideMerge(deps),
206+
)
195207
const compact = SessionCompaction.layer.pipe(
196208
Layer.provide(RuntimeFlags.layer({ experimentalEventSystem: true })),
197209
Layer.provideMerge(proc),
@@ -218,6 +230,7 @@ function makeHttp() {
218230
}
219231

220232
const it = testEffect(makeHttp())
233+
const race = testEffect(makeHttp({ processor: "blocking" }))
221234
const unix = process.platform !== "win32" ? it.instance : it.instance.skip
222235

223236
// Config that registers a custom "test" provider with a "test-model" model
@@ -341,6 +354,14 @@ const deferredAsPromise = <A>(deferred: Deferred.Deferred<A>): PromiseLike<A> =>
341354
},
342355
})
343356

357+
function defer<T>() {
358+
let resolve!: (value: T | PromiseLike<T>) => void
359+
const promise = new Promise<T>((done) => {
360+
resolve = done
361+
})
362+
return { promise, resolve }
363+
}
364+
344365
const succeedVoid = (deferred: Deferred.Deferred<void>) => {
345366
Effect.runSync(Deferred.succeed(deferred, void 0).pipe(Effect.ignore))
346367
}
@@ -896,6 +917,96 @@ it.instance(
896917
3_000,
897918
)
898919

920+
race.instance(
921+
"finalizes assistant when cancelled before processor creation completes",
922+
() =>
923+
Effect.gen(function* () {
924+
yield* useServerConfig(providerCfg)
925+
processorCreateStarted.length = 0
926+
yield* Effect.addFinalizer(() =>
927+
Effect.sync(() => {
928+
processorCreateStarted.length = 0
929+
}),
930+
)
931+
932+
const prompt = yield* SessionPrompt.Service
933+
const sessions = yield* Session.Service
934+
const chat = yield* sessions.create({ title: "Processor creation race" })
935+
936+
yield* prompt.prompt({
937+
sessionID: chat.id,
938+
agent: "build",
939+
noReply: true,
940+
parts: [{ type: "text", text: "first" }],
941+
})
942+
943+
const firstCreate = defer<void>()
944+
processorCreateStarted.push(firstCreate.resolve)
945+
const first = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
946+
yield* Effect.promise(() => firstCreate.promise)
947+
948+
yield* prompt.cancel(chat.id)
949+
const firstExit = yield* Fiber.await(first)
950+
expect(Exit.isSuccess(firstExit)).toBe(true)
951+
952+
let messages = yield* sessions.messages({ sessionID: chat.id })
953+
const firstInterrupted = messages.at(-1)
954+
expect(firstInterrupted?.info.role).toBe("assistant")
955+
expect(firstInterrupted?.parts).toHaveLength(0)
956+
if (firstInterrupted?.info.role === "assistant") {
957+
expect(firstInterrupted.info.finish).toBeUndefined()
958+
expect(firstInterrupted.info.time.completed).toBeNumber()
959+
expect(firstInterrupted.info.error?.name).toBe("MessageAbortedError")
960+
}
961+
962+
yield* prompt.prompt({
963+
sessionID: chat.id,
964+
agent: "build",
965+
noReply: true,
966+
parts: [{ type: "text", text: "second" }],
967+
})
968+
969+
const secondCreate = defer<void>()
970+
processorCreateStarted.push(secondCreate.resolve)
971+
const second = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
972+
yield* Effect.promise(() => secondCreate.promise)
973+
974+
yield* prompt.cancel(chat.id)
975+
const secondExit = yield* Fiber.await(second)
976+
expect(Exit.isSuccess(secondExit)).toBe(true)
977+
978+
messages = yield* sessions.messages({ sessionID: chat.id })
979+
const poisonMessages = messages.filter(
980+
(message) =>
981+
message.info.role === "assistant" &&
982+
message.parts.length === 0 &&
983+
!message.info.finish &&
984+
!message.info.time.completed &&
985+
!message.info.error,
986+
)
987+
expect(poisonMessages).toHaveLength(0)
988+
989+
const interruptedMessages = messages.filter(
990+
(message) =>
991+
message.info.role === "assistant" &&
992+
message.parts.length === 0 &&
993+
message.info.time.completed &&
994+
message.info.error?.name === "MessageAbortedError",
995+
)
996+
expect(interruptedMessages).toHaveLength(2)
997+
998+
const lastUser = messages.at(-2)
999+
const lastAssistant = messages.at(-1)
1000+
expect(lastUser?.info.role).toBe("user")
1001+
expect(lastAssistant?.info.role).toBe("assistant")
1002+
if (lastUser?.info.role === "user" && lastAssistant?.info.role === "assistant") {
1003+
expect(lastAssistant.info.parentID).toBe(lastUser?.info.id)
1004+
}
1005+
}),
1006+
{ git: true },
1007+
3_000,
1008+
)
1009+
8991010
it.instance(
9001011
"cancel finalizes subtask tool state",
9011012
() =>

0 commit comments

Comments
 (0)