@@ -11,7 +11,7 @@ import {
1111 type ProviderErrorEvent ,
1212} from "@opencode-ai/llm"
1313import { SessionError } from "@opencode-ai/schema/session-error"
14- import { Cause , Effect , Exit , FiberSet , Layer , Option , Semaphore , Stream } from "effect"
14+ import { Cause , Effect , Exit , Fiber , FiberSet , Layer , Option , Semaphore , Stream } from "effect"
1515import { AgentV2 } from "../../agent"
1616import { Config } from "../../config"
1717import { Database } from "../../database/database"
@@ -148,9 +148,6 @@ const layer = Layer.effect(
148148 }
149149 } )
150150
151- const awaitToolFibers = ( fibers : FiberSet . FiberSet < void , ToolOutputStore . Error | UserInterruptedError > ) =>
152- Effect . raceFirst ( FiberSet . join ( fibers ) , FiberSet . awaitEmpty ( fibers ) )
153-
154151 // Match V1: dismissing a question halts the loop instead of becoming model-facing tool output.
155152 const isQuestionRejected = ( cause : Cause . Cause < unknown > ) =>
156153 cause . reasons . some ( ( reason ) => Cause . isDieReason ( reason ) && reason . defect instanceof QuestionV2 . RejectedError )
@@ -188,6 +185,7 @@ const layer = Layer.effect(
188185 session . id ,
189186 )
190187 const toolFibers = yield * FiberSet . make < void , ToolOutputStore . Error | UserInterruptedError > ( )
188+ const ownedToolFibers : Array < Fiber . Fiber < void , ToolOutputStore . Error | UserInterruptedError > > = [ ]
191189 let needsContinuation = false
192190 let currentStep = step
193191 if ( promotion ) {
@@ -265,37 +263,39 @@ const layer = Layer.effect(
265263 }
266264 needsContinuation = true
267265 const assistantMessageID = yield * publisher . assistantMessageID ( event . id )
268- yield * Effect . uninterruptibleMask ( ( restore ) =>
269- restore (
270- toolMaterialization . settle ( {
271- sessionID : session . id ,
272- agent : agent . id ,
273- assistantMessageID,
274- call : event ,
275- } ) ,
276- ) . pipe (
277- Effect . flatMap ( ( settlement ) =>
278- publish (
279- LLMEvent . toolResult ( {
280- id : event . id ,
281- name : event . name ,
282- result : settlement . result ,
283- output : settlement . output ,
284- } ) ,
285- settlement . outputPaths ?? [ ] ,
286- settlement . error ,
287- ) . pipe (
288- Effect . andThen (
289- settlement . error ?. type === "permission.rejected"
290- ? serialized ( publisher . failAssistant ( settlement . error ) ) . pipe (
291- Effect . andThen ( Effect . fail ( new UserInterruptedError ( ) ) ) ,
292- )
293- : Effect . void ,
266+ ownedToolFibers . push (
267+ yield * Effect . uninterruptibleMask ( ( restore ) =>
268+ restore (
269+ toolMaterialization . settle ( {
270+ sessionID : session . id ,
271+ agent : agent . id ,
272+ assistantMessageID,
273+ call : event ,
274+ } ) ,
275+ ) . pipe (
276+ Effect . flatMap ( ( settlement ) =>
277+ publish (
278+ LLMEvent . toolResult ( {
279+ id : event . id ,
280+ name : event . name ,
281+ result : settlement . result ,
282+ output : settlement . output ,
283+ } ) ,
284+ settlement . outputPaths ?? [ ] ,
285+ settlement . error ,
286+ ) . pipe (
287+ Effect . andThen (
288+ settlement . error ?. type === "permission.rejected"
289+ ? serialized ( publisher . failAssistant ( settlement . error ) ) . pipe (
290+ Effect . andThen ( Effect . fail ( new UserInterruptedError ( ) ) ) ,
291+ )
292+ : Effect . void ,
293+ ) ,
294294 ) ,
295295 ) ,
296296 ) ,
297- ) ,
298- ) . pipe ( FiberSet . run ( toolFibers ) )
297+ ) . pipe ( FiberSet . run ( toolFibers ) ) ,
298+ )
299299 } ) ,
300300 ) ,
301301 Effect . ensuring ( serialized ( publisher . flush ( ) ) ) ,
@@ -345,8 +345,8 @@ const layer = Layer.effect(
345345 return { _tag : "RestartAfterOverflowCompaction" , step : currentStep } as const
346346
347347 // An unrecovered held-back overflow becomes the step's durable provider error. A
348- // thrown LLM failure fails hosted tool calls and the assistant unless a provider
349- // error was already recorded from the stream.
348+ // thrown LLM failure records the assistant failure unless a provider error was
349+ // already recorded from the stream. Terminal publication waits for owned tools .
350350 if ( overflowFailure ) yield * publish ( overflowFailure )
351351 const llmFailure = streamFailure instanceof LLMError ? streamFailure : undefined
352352 if ( llmFailure && ! publisher . hasProviderError ( ) ) {
@@ -363,39 +363,39 @@ const layer = Layer.effect(
363363 step : currentStep ,
364364 } )
365365 }
366- yield * serialized (
367- publisher . failUnsettledTools (
368- { type : "tool.result-missing" , message : "Provider did not return a tool result" } ,
369- true ,
370- ) ,
371- )
372366 yield * serialized ( publisher . failAssistant ( error ) )
373367 }
374368 // Provider error events only arrive from the stream, so the flag is final here.
375369 const providerFailed = publisher . hasProviderError ( )
376370
377- // Settle tool fibers: an interrupted stream abandons unstarted tool work first.
371+ // Settle every owned tool fiber. FiberSet.join returns on the first failure, so retain
372+ // the individual fibers and await all exits before publishing the terminal step event.
378373 if ( streamInterrupted ) yield * FiberSet . clear ( toolFibers )
379- const settled = yield * restore ( awaitToolFibers ( toolFibers ) ) . pipe ( Effect . exit )
380- const toolsInterrupted = settled . _tag === "Failure" && Cause . hasInterrupts ( settled . cause )
381- const questionDismissed = settled . _tag === "Failure" && isQuestionRejected ( settled . cause )
382- const settledError =
383- settled . _tag === "Failure" ? Option . getOrUndefined ( Cause . findErrorOption ( settled . cause ) ) : undefined
384- const permissionRejected = settledError instanceof UserInterruptedError
374+ const settled = yield * restore (
375+ Effect . forEach ( ownedToolFibers , Fiber . await , { concurrency : "unbounded" } ) ,
376+ ) . pipe ( Effect . exit )
377+ const settledCauses =
378+ settled . _tag === "Failure"
379+ ? [ settled . cause ]
380+ : settled . value . flatMap ( ( exit ) => ( exit . _tag === "Failure" ? [ exit . cause ] : [ ] ) )
381+ const toolsInterrupted = settledCauses . some ( Cause . hasInterrupts )
382+ const questionDismissed = settledCauses . some ( isQuestionRejected )
383+ const permissionRejected = settledCauses . some (
384+ ( cause ) => Option . getOrUndefined ( Cause . findErrorOption ( cause ) ) instanceof UserInterruptedError ,
385+ )
385386
386387 if ( questionDismissed || permissionRejected || streamInterrupted || toolsInterrupted ) {
387388 yield * FiberSet . clear ( toolFibers )
388389 yield * serialized ( publisher . failUnsettledTools ( { type : "aborted" , message : "Tool execution interrupted" } ) )
389390 yield * serialized ( publisher . failAssistant ( { type : "aborted" , message : "Step interrupted" } ) )
390- // Match V1: dismissing a question halts the loop like an interruption.
391- if ( questionDismissed || permissionRejected ) return yield * new UserInterruptedError ( )
392391 }
393392 // A settled tool fiber failure is one of two things. A defect from a tool
394393 // implementation becomes a failed tool call the model can read, and the step still
395394 // settles so the model may recover. A typed infrastructure failure (tool output
396395 // could not be persisted) also fails the assistant and then fails the drain.
397- const settledFailure =
398- settled . _tag === "Failure" && ! toolsInterrupted && ! permissionRejected ? settled . cause : undefined
396+ const settledFailure = settledCauses . find (
397+ ( cause ) => ! Cause . hasInterrupts ( cause ) && ! isQuestionRejected ( cause ) && ! permissionRejected ,
398+ )
399399 const infraError =
400400 settledFailure === undefined ? undefined : Option . getOrUndefined ( Cause . findErrorOption ( settledFailure ) )
401401 if ( settledFailure !== undefined ) {
@@ -405,26 +405,50 @@ const layer = Layer.effect(
405405 if ( infraError !== undefined ) yield * serialized ( publisher . failAssistant ( error ) )
406406 }
407407
408- const stepSettlement = publisher . stepSettlement ( )
409- const stepEndedCleanly =
410- ! streamInterrupted && ! toolsInterrupted && infraError === undefined && ! providerFailed
411- if ( stepSettlement && stepEndedCleanly ) yield * publishStepEnd ( stepSettlement )
412- // A provider error orphans recorded local calls; a clean stream can still leave
413- // hosted calls without results.
408+ // Fail unresolved calls before the terminal step event. Local calls have joined, so
409+ // these sweeps only close calls that could not produce a truthful settlement.
414410 if ( providerFailed )
415411 yield * serialized ( publisher . failUnsettledTools ( { type : "aborted" , message : "Tool execution interrupted" } ) )
416- if ( stream . _tag === "Success" && ! providerFailed )
412+ if ( llmFailure && ! providerFailed )
417413 yield * serialized (
418414 publisher . failUnsettledTools (
419- { type : "tool.result-missing" , message : "Provider did not return a tool result" } ,
415+ {
416+ type : "tool.result-missing" ,
417+ message : "Provider did not return a tool result" ,
418+ } ,
420419 true ,
421420 ) ,
422421 )
422+ const hostedResultMissing =
423+ stream . _tag === "Success" && ! providerFailed
424+ ? yield * serialized (
425+ publisher . failUnsettledTools (
426+ { type : "tool.result-missing" , message : "Provider did not return a tool result" } ,
427+ true ,
428+ ) ,
429+ )
430+ : false
431+ if ( hostedResultMissing && ! publisher . stepSettlement ( ) )
432+ yield * serialized (
433+ publisher . failAssistant ( {
434+ type : "tool.result-missing" ,
435+ message : "Provider did not return a tool result" ,
436+ } ) ,
437+ )
423438
424- if ( stream . _tag === "Failure" ) return yield * Effect . failCause ( stream . cause )
425- if ( settled . _tag === "Failure" && ( toolsInterrupted || infraError !== undefined ) )
426- return yield * Effect . failCause ( settled . cause )
427439 const stepFailure = publisher . stepFailure ( )
440+ const stepSettlement = publisher . stepSettlement ( )
441+ const stepEndedCleanly =
442+ ! streamInterrupted && ! toolsInterrupted && infraError === undefined && ! providerFailed && ! stepFailure
443+ if ( stepSettlement && stepEndedCleanly ) yield * publishStepEnd ( stepSettlement )
444+ if ( stepFailure ) yield * serialized ( publisher . publishStepFailure ( ) )
445+
446+ if ( stream . _tag === "Failure" ) return yield * Effect . failCause ( stream . cause )
447+ // Match V1: dismissing a question halts the loop like an interruption.
448+ if ( questionDismissed || permissionRejected ) return yield * new UserInterruptedError ( )
449+ if ( ( toolsInterrupted || infraError !== undefined ) && settledFailure )
450+ return yield * Effect . failCause ( settledFailure )
451+ if ( toolsInterrupted && settled . _tag === "Failure" ) return yield * Effect . failCause ( settled . cause )
428452 if ( stepFailure ) return yield * new StepFailedError ( { error : stepFailure } )
429453 return {
430454 _tag : "Completed" ,
0 commit comments