Skip to content

Commit 521665f

Browse files
committed
merge: F7 RunEvents runtime reuse / unblock bus dispatch
PR: #10 (closed, review-only) Diamond: codex-5.3 + Opus both APPROVED Copilot: 7 rounds, R7 clean (no new comments)
2 parents 379dbf4 + 7779646 commit 521665f

2 files changed

Lines changed: 90 additions & 3 deletions

File tree

packages/opencode/src/cli/cmd/run-events.ts

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { Effect, Option } from "effect"
1+
import { Cause, Effect, Fiber, Option } from "effect"
22
import { Bus } from "@/bus"
33
import { Permission } from "@/permission"
44
import { Question } from "@/question"
@@ -86,8 +86,44 @@ export const make = Effect.fn("RunEvents.make")(function* (config: Config) {
8686
}
8787
}
8888

89+
// bus.subscribeCallback wraps the callback in an Effect.tryPromise-based
90+
// subscription handler, so a Promise-returning callback (like Effect.runPromise)
91+
// serializes handler completion per subscription. runFork returns a Fiber
92+
// synchronously (non-thenable), unblocking dispatch so descendant question/
93+
// permission events are processed concurrently — important for long-running
94+
// subagent loops with many simultaneous descendants. Defects inside the forked
95+
// fiber do not surface through that subscription callback wrapper, so log them
96+
// here instead. Track in-flight fibers so unsubscribe() can interrupt them and
97+
// bound handler work to the RunEvents lifecycle.
98+
const inflight = new Set<Fiber.Fiber<void>>()
99+
let closed = false
100+
const fork = (effect: Effect.Effect<void>) => {
101+
if (closed) {
102+
// unsubscribe() already ran but bus subscription teardown is async, so
103+
// a late callback can still reach fork(). Skip starting the handler
104+
// entirely so no side effects (bump, reject, reply) leak past teardown.
105+
// Returning undefined (not a Promise) still unblocks the bus dispatch
106+
// wrapper without spawning a no-op fiber.
107+
return
108+
}
109+
const fiber = Effect.runFork(
110+
effect.pipe(
111+
Effect.tapCause((cause) =>
112+
Cause.hasInterruptsOnly(cause)
113+
? Effect.void
114+
: Effect.sync(() => log.error("handler failed", { cause })),
115+
),
116+
),
117+
)
118+
inflight.add(fiber)
119+
// Register cleanup outside the forked effect to avoid a TDZ/race between
120+
// synchronous fiber completion and inflight.add — Fiber.await observes
121+
// completion regardless of how fast the fiber runs.
122+
Effect.runFork(Fiber.await(fiber).pipe(Effect.ensuring(Effect.sync(() => inflight.delete(fiber)))))
123+
}
124+
89125
const unsubQuestion = yield* bus.subscribeCallback(Question.Event.Asked, (evt) =>
90-
Effect.runPromise(
126+
fork(
91127
Effect.gen(function* () {
92128
const mine = yield* isDescendant(evt.properties.sessionID)
93129
if (!mine) return
@@ -98,7 +134,7 @@ export const make = Effect.fn("RunEvents.make")(function* (config: Config) {
98134
)
99135

100136
const unsubPermission = yield* bus.subscribeCallback(Permission.Event.Asked, (evt) =>
101-
Effect.runPromise(
137+
fork(
102138
Effect.gen(function* () {
103139
const mine = yield* isDescendant(evt.properties.sessionID)
104140
if (!mine) return
@@ -113,8 +149,13 @@ export const make = Effect.fn("RunEvents.make")(function* (config: Config) {
113149
)
114150

115151
const unsubscribe = () => {
152+
closed = true
116153
unsubQuestion()
117154
unsubPermission()
155+
inflight.forEach((fiber) => Effect.runFork(Fiber.interrupt(fiber)))
156+
// Don't clear() — let the per-fiber Fiber.await observers remove entries
157+
// as their interrupts settle, so any stragglers caught by the closed-flag
158+
// branch above still get cleaned up correctly.
118159
}
119160

120161
return { stats, unsubscribe } satisfies Handle

packages/opencode/test/cli/run-events.test.ts

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -523,4 +523,50 @@ describe("cli/run-events", () => {
523523
}),
524524
),
525525
)
526+
527+
// Lifecycle test for F7 fiber-tracking: after unsubscribe(), late-arriving
528+
// bus callbacks must not produce new auto-rejects. This exercises both the
529+
// bus unsubscription path and the `closed` flag in fork() that prevents
530+
// late callbacks from forking new handler fibers after teardown has begun.
531+
it.live("does not auto-reject question.asked after unsubscribe()", () =>
532+
provideTmpdirInstance(() =>
533+
Effect.gen(function* () {
534+
const question = yield* Question.Service
535+
const rootSessionID = SessionID.make("ses_root_post_unsub_000000000000")
536+
const handler = yield* RunEvents.make({
537+
rootSessionID,
538+
skipPermissions: false,
539+
jsonMode: false,
540+
})
541+
yield* Effect.sync(() => handler.unsubscribe())
542+
543+
// Ask after unsubscribe — without the bus subscription, no auto-reject
544+
// handler runs and the question stays pending.
545+
const fiber = yield* question
546+
.ask({
547+
sessionID: rootSessionID,
548+
questions: [{ question: "post?", header: "h", options: [{ label: "x", description: "x" }] }],
549+
})
550+
.pipe(Effect.forkScoped)
551+
552+
const pending = yield* pollForLength(() => question.list(), 1)
553+
expect(pending[0].sessionID).toBe(rootSessionID)
554+
expect(handler.stats.autoRejectedQuestions).toBe(0)
555+
556+
// Give any late async bus callbacks a chance to run, then verify the
557+
// question is still pending and no auto-reject occurred.
558+
yield* Effect.sleep("50 millis")
559+
const stillPending = yield* question.list()
560+
expect(stillPending).toHaveLength(1)
561+
expect(stillPending[0].id).toBe(pending[0].id)
562+
expect(stillPending[0].sessionID).toBe(rootSessionID)
563+
expect(handler.stats.autoRejectedQuestions).toBe(0)
564+
565+
// Manually clear the pending question so the test can finish.
566+
yield* question.reject(pending[0].id)
567+
const exit = yield* Fiber.await(fiber)
568+
expect(Exit.isFailure(exit)).toBe(true)
569+
}),
570+
),
571+
)
526572
})

0 commit comments

Comments
 (0)