Skip to content

Commit 8bdcc22

Browse files
authored
refactor(effect): inline session processor interrupt cleanup (#21593)
1 parent 2bdd279 commit 8bdcc22

File tree

9 files changed

+119
-151
lines changed

9 files changed

+119
-151
lines changed

packages/opencode/src/file/time.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ export namespace FileTime {
4646
const disableCheck = yield* Flag.OPENCODE_DISABLE_FILETIME_CHECK
4747

4848
const stamp = Effect.fnUntraced(function* (file: string) {
49-
const info = yield* fsys.stat(file).pipe(Effect.catch(() => Effect.succeed(undefined)))
49+
const info = yield* fsys.stat(file).pipe(Effect.catch(() => Effect.void))
5050
return {
5151
read: yield* DateTime.nowAsDate,
5252
mtime: info ? Option.getOrUndefined(info.mtime)?.getTime() : undefined,

packages/opencode/src/mcp/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,7 @@ export namespace MCP {
501501
return
502502
}
503503

504-
const result = yield* create(key, mcp).pipe(Effect.catch(() => Effect.succeed(undefined)))
504+
const result = yield* create(key, mcp).pipe(Effect.catch(() => Effect.void))
505505
if (!result) return
506506

507507
s.status[key] = result.status

packages/opencode/src/project/project.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ export namespace Project {
158158
return yield* fs.readFileString(pathSvc.join(dir, "opencode")).pipe(
159159
Effect.map((x) => x.trim()),
160160
Effect.map(ProjectID.make),
161-
Effect.catch(() => Effect.succeed(undefined)),
161+
Effect.catch(() => Effect.void),
162162
)
163163
})
164164

packages/opencode/src/session/compaction.ts

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -253,23 +253,21 @@ When constructing the summary, try to stick to this template:
253253
sessionID: input.sessionID,
254254
model,
255255
})
256-
const result = yield* processor
257-
.process({
258-
user: userMessage,
259-
agent,
260-
sessionID: input.sessionID,
261-
tools: {},
262-
system: [],
263-
messages: [
264-
...modelMessages,
265-
{
266-
role: "user",
267-
content: [{ type: "text", text: prompt }],
268-
},
269-
],
270-
model,
271-
})
272-
.pipe(Effect.onInterrupt(() => processor.abort()))
256+
const result = yield* processor.process({
257+
user: userMessage,
258+
agent,
259+
sessionID: input.sessionID,
260+
tools: {},
261+
system: [],
262+
messages: [
263+
...modelMessages,
264+
{
265+
role: "user",
266+
content: [{ type: "text", text: prompt }],
267+
},
268+
],
269+
model,
270+
})
273271

274272
if (result === "compact") {
275273
processor.message.error = new MessageV2.ContextOverflowError({

packages/opencode/src/session/processor.ts

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ export namespace SessionProcessor {
3030
export interface Handle {
3131
readonly message: MessageV2.Assistant
3232
readonly partFromToolCall: (toolCallID: string) => MessageV2.ToolPart | undefined
33-
readonly abort: () => Effect.Effect<void>
3433
readonly process: (streamInput: LLM.StreamInput) => Effect.Effect<Result>
3534
}
3635

@@ -429,19 +428,6 @@ export namespace SessionProcessor {
429428
yield* status.set(ctx.sessionID, { type: "idle" })
430429
})
431430

432-
const abort = Effect.fn("SessionProcessor.abort")(() =>
433-
Effect.gen(function* () {
434-
if (!ctx.assistantMessage.error) {
435-
yield* halt(new DOMException("Aborted", "AbortError"))
436-
}
437-
if (!ctx.assistantMessage.time.completed) {
438-
yield* cleanup()
439-
return
440-
}
441-
yield* session.updateMessage(ctx.assistantMessage)
442-
}),
443-
)
444-
445431
const process = Effect.fn("SessionProcessor.process")(function* (streamInput: LLM.StreamInput) {
446432
log.info("process")
447433
ctx.needsCompaction = false
@@ -459,7 +445,14 @@ export namespace SessionProcessor {
459445
Stream.runDrain,
460446
)
461447
}).pipe(
462-
Effect.onInterrupt(() => Effect.sync(() => void (aborted = true))),
448+
Effect.onInterrupt(() =>
449+
Effect.gen(function* () {
450+
aborted = true
451+
if (!ctx.assistantMessage.error) {
452+
yield* halt(new DOMException("Aborted", "AbortError"))
453+
}
454+
}),
455+
),
463456
Effect.catchCauseIf(
464457
(cause) => !Cause.hasInterruptsOnly(cause),
465458
(cause) => Effect.fail(Cause.squash(cause)),
@@ -480,13 +473,10 @@ export namespace SessionProcessor {
480473
Effect.ensuring(cleanup()),
481474
)
482475

483-
if (aborted && !ctx.assistantMessage.error) {
484-
yield* abort()
485-
}
486476
if (ctx.needsCompaction) return "compact"
487-
if (ctx.blocked || ctx.assistantMessage.error || aborted) return "stop"
477+
if (ctx.blocked || ctx.assistantMessage.error) return "stop"
488478
return "continue"
489-
}).pipe(Effect.onInterrupt(() => abort().pipe(Effect.asVoid)))
479+
})
490480
})
491481

492482
return {
@@ -496,7 +486,6 @@ export namespace SessionProcessor {
496486
partFromToolCall(toolCallID: string) {
497487
return ctx.toolcalls[toolCallID]
498488
},
499-
abort,
500489
process,
501490
} satisfies Handle
502491
})

packages/opencode/src/session/prompt.ts

Lines changed: 90 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -964,9 +964,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
964964
const same = ag.model && model.providerID === ag.model.providerID && model.modelID === ag.model.modelID
965965
const full =
966966
!input.variant && ag.variant && same
967-
? yield* provider
968-
.getModel(model.providerID, model.modelID)
969-
.pipe(Effect.catch(() => Effect.succeed(undefined)))
967+
? yield* provider.getModel(model.providerID, model.modelID).pipe(Effect.catchDefect(() => Effect.void))
970968
: undefined
971969
const variant = input.variant ?? (ag.variant && full?.variants?.[ag.variant] ? ag.variant : undefined)
972970

@@ -986,9 +984,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
986984
format: input.format,
987985
}
988986

989-
yield* Effect.addFinalizer(() =>
990-
InstanceState.withALS(() => instruction.clear(info.id)).pipe(Effect.flatMap((x) => x)),
991-
)
987+
yield* Effect.addFinalizer(() => instruction.clear(info.id))
992988

993989
type Draft<T> = T extends MessageV2.Part ? Omit<T, "id"> & { id?: string } : never
994990
const assign = (part: Draft<MessageV2.Part>): MessageV2.Part => ({
@@ -1459,110 +1455,104 @@ NOTE: At any point in time through this workflow you should feel free to ask the
14591455
model,
14601456
})
14611457

1462-
const outcome: "break" | "continue" = yield* Effect.onExit(
1463-
Effect.gen(function* () {
1464-
const lastUserMsg = msgs.findLast((m) => m.info.role === "user")
1465-
const bypassAgentCheck = lastUserMsg?.parts.some((p) => p.type === "agent") ?? false
1466-
1467-
const tools = yield* resolveTools({
1468-
agent,
1469-
session,
1470-
model,
1471-
tools: lastUser.tools,
1472-
processor: handle,
1473-
bypassAgentCheck,
1474-
messages: msgs,
1475-
})
1458+
const outcome: "break" | "continue" = yield* Effect.gen(function* () {
1459+
const lastUserMsg = msgs.findLast((m) => m.info.role === "user")
1460+
const bypassAgentCheck = lastUserMsg?.parts.some((p) => p.type === "agent") ?? false
14761461

1477-
if (lastUser.format?.type === "json_schema") {
1478-
tools["StructuredOutput"] = createStructuredOutputTool({
1479-
schema: lastUser.format.schema,
1480-
onSuccess(output) {
1481-
structured = output
1482-
},
1483-
})
1484-
}
1462+
const tools = yield* resolveTools({
1463+
agent,
1464+
session,
1465+
model,
1466+
tools: lastUser.tools,
1467+
processor: handle,
1468+
bypassAgentCheck,
1469+
messages: msgs,
1470+
})
14851471

1486-
if (step === 1) SessionSummary.summarize({ sessionID, messageID: lastUser.id })
1487-
1488-
if (step > 1 && lastFinished) {
1489-
for (const m of msgs) {
1490-
if (m.info.role !== "user" || m.info.id <= lastFinished.id) continue
1491-
for (const p of m.parts) {
1492-
if (p.type !== "text" || p.ignored || p.synthetic) continue
1493-
if (!p.text.trim()) continue
1494-
p.text = [
1495-
"<system-reminder>",
1496-
"The user sent the following message:",
1497-
p.text,
1498-
"",
1499-
"Please address this message and continue with your tasks.",
1500-
"</system-reminder>",
1501-
].join("\n")
1502-
}
1472+
if (lastUser.format?.type === "json_schema") {
1473+
tools["StructuredOutput"] = createStructuredOutputTool({
1474+
schema: lastUser.format.schema,
1475+
onSuccess(output) {
1476+
structured = output
1477+
},
1478+
})
1479+
}
1480+
1481+
if (step === 1) SessionSummary.summarize({ sessionID, messageID: lastUser.id })
1482+
1483+
if (step > 1 && lastFinished) {
1484+
for (const m of msgs) {
1485+
if (m.info.role !== "user" || m.info.id <= lastFinished.id) continue
1486+
for (const p of m.parts) {
1487+
if (p.type !== "text" || p.ignored || p.synthetic) continue
1488+
if (!p.text.trim()) continue
1489+
p.text = [
1490+
"<system-reminder>",
1491+
"The user sent the following message:",
1492+
p.text,
1493+
"",
1494+
"Please address this message and continue with your tasks.",
1495+
"</system-reminder>",
1496+
].join("\n")
15031497
}
15041498
}
1499+
}
15051500

1506-
yield* plugin.trigger("experimental.chat.messages.transform", {}, { messages: msgs })
1507-
1508-
const [skills, env, instructions, modelMsgs] = yield* Effect.all([
1509-
Effect.promise(() => SystemPrompt.skills(agent)),
1510-
Effect.promise(() => SystemPrompt.environment(model)),
1511-
instruction.system().pipe(Effect.orDie),
1512-
Effect.promise(() => MessageV2.toModelMessages(msgs, model)),
1513-
])
1514-
const system = [...env, ...(skills ? [skills] : []), ...instructions]
1515-
const format = lastUser.format ?? { type: "text" as const }
1516-
if (format.type === "json_schema") system.push(STRUCTURED_OUTPUT_SYSTEM_PROMPT)
1517-
const result = yield* handle.process({
1518-
user: lastUser,
1519-
agent,
1520-
permission: session.permission,
1521-
sessionID,
1522-
parentSessionID: session.parentID,
1523-
system,
1524-
messages: [...modelMsgs, ...(isLastStep ? [{ role: "assistant" as const, content: MAX_STEPS }] : [])],
1525-
tools,
1526-
model,
1527-
toolChoice: format.type === "json_schema" ? "required" : undefined,
1528-
})
1501+
yield* plugin.trigger("experimental.chat.messages.transform", {}, { messages: msgs })
1502+
1503+
const [skills, env, instructions, modelMsgs] = yield* Effect.all([
1504+
Effect.promise(() => SystemPrompt.skills(agent)),
1505+
Effect.promise(() => SystemPrompt.environment(model)),
1506+
instruction.system().pipe(Effect.orDie),
1507+
Effect.promise(() => MessageV2.toModelMessages(msgs, model)),
1508+
])
1509+
const system = [...env, ...(skills ? [skills] : []), ...instructions]
1510+
const format = lastUser.format ?? { type: "text" as const }
1511+
if (format.type === "json_schema") system.push(STRUCTURED_OUTPUT_SYSTEM_PROMPT)
1512+
const result = yield* handle.process({
1513+
user: lastUser,
1514+
agent,
1515+
permission: session.permission,
1516+
sessionID,
1517+
parentSessionID: session.parentID,
1518+
system,
1519+
messages: [...modelMsgs, ...(isLastStep ? [{ role: "assistant" as const, content: MAX_STEPS }] : [])],
1520+
tools,
1521+
model,
1522+
toolChoice: format.type === "json_schema" ? "required" : undefined,
1523+
})
1524+
1525+
if (structured !== undefined) {
1526+
handle.message.structured = structured
1527+
handle.message.finish = handle.message.finish ?? "stop"
1528+
yield* sessions.updateMessage(handle.message)
1529+
return "break" as const
1530+
}
15291531

1530-
if (structured !== undefined) {
1531-
handle.message.structured = structured
1532-
handle.message.finish = handle.message.finish ?? "stop"
1532+
const finished = handle.message.finish && !["tool-calls", "unknown"].includes(handle.message.finish)
1533+
if (finished && !handle.message.error) {
1534+
if (format.type === "json_schema") {
1535+
handle.message.error = new MessageV2.StructuredOutputError({
1536+
message: "Model did not produce structured output",
1537+
retries: 0,
1538+
}).toObject()
15331539
yield* sessions.updateMessage(handle.message)
15341540
return "break" as const
15351541
}
1542+
}
15361543

1537-
const finished = handle.message.finish && !["tool-calls", "unknown"].includes(handle.message.finish)
1538-
if (finished && !handle.message.error) {
1539-
if (format.type === "json_schema") {
1540-
handle.message.error = new MessageV2.StructuredOutputError({
1541-
message: "Model did not produce structured output",
1542-
retries: 0,
1543-
}).toObject()
1544-
yield* sessions.updateMessage(handle.message)
1545-
return "break" as const
1546-
}
1547-
}
1548-
1549-
if (result === "stop") return "break" as const
1550-
if (result === "compact") {
1551-
yield* compaction.create({
1552-
sessionID,
1553-
agent: lastUser.agent,
1554-
model: lastUser.model,
1555-
auto: true,
1556-
overflow: !handle.message.finish,
1557-
})
1558-
}
1559-
return "continue" as const
1560-
}),
1561-
Effect.fnUntraced(function* (exit) {
1562-
if (Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)) yield* handle.abort()
1563-
yield* InstanceState.withALS(() => instruction.clear(handle.message.id)).pipe(Effect.flatMap((x) => x))
1564-
}),
1565-
)
1544+
if (result === "stop") return "break" as const
1545+
if (result === "compact") {
1546+
yield* compaction.create({
1547+
sessionID,
1548+
agent: lastUser.agent,
1549+
model: lastUser.model,
1550+
auto: true,
1551+
overflow: !handle.message.finish,
1552+
})
1553+
}
1554+
return "continue" as const
1555+
}).pipe(Effect.ensuring(instruction.clear(handle.message.id)))
15661556
if (outcome === "break") break
15671557
continue
15681558
}

packages/opencode/src/tool/read.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,7 @@ export const ReadTool = Tool.defineEffect(
6767
if (item.type === "directory") return item.name + "/"
6868
if (item.type !== "symlink") return item.name
6969

70-
const target = yield* fs
71-
.stat(path.join(filepath, item.name))
72-
.pipe(Effect.catch(() => Effect.succeed(undefined)))
70+
const target = yield* fs.stat(path.join(filepath, item.name)).pipe(Effect.catch(() => Effect.void))
7371
if (target?.type === "Directory") return item.name + "/"
7472
return item.name
7573
}),

packages/opencode/test/session/compaction.test.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,6 @@ function fake(
139139
get message() {
140140
return msg
141141
},
142-
abort: Effect.fn("TestSessionProcessor.abort")(() => Effect.void),
143142
partFromToolCall() {
144143
return {
145144
id: PartID.ascending(),

packages/opencode/test/session/processor-effect.test.ts

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -593,9 +593,6 @@ it.live("session.processor effect tests mark pending tools as aborted on cleanup
593593
yield* Fiber.interrupt(run)
594594

595595
const exit = yield* Fiber.await(run)
596-
if (Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)) {
597-
yield* handle.abort()
598-
}
599596
const parts = MessageV2.parts(msg.id)
600597
const call = parts.find((part): part is MessageV2.ToolPart => part.type === "tool")
601598

@@ -665,9 +662,6 @@ it.live("session.processor effect tests record aborted errors and idle state", (
665662
yield* Fiber.interrupt(run)
666663

667664
const exit = yield* Fiber.await(run)
668-
if (Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)) {
669-
yield* handle.abort()
670-
}
671665
yield* Effect.promise(() => seen.promise)
672666
const stored = MessageV2.get({ sessionID: chat.id, messageID: msg.id })
673667
const state = yield* sts.get(chat.id)

0 commit comments

Comments
 (0)