diff --git a/packages/llm/src/protocols/anthropic-messages.ts b/packages/llm/src/protocols/anthropic-messages.ts index d893888fd22e..e27af18426ef 100644 --- a/packages/llm/src/protocols/anthropic-messages.ts +++ b/packages/llm/src/protocols/anthropic-messages.ts @@ -17,6 +17,7 @@ import { } from "../schema" import { JsonObject, optionalArray, optionalNull, ProviderShared } from "./shared" import * as Cache from "./utils/cache" +import { Lifecycle } from "./utils/lifecycle" import { ToolStream } from "./utils/tool-stream" const ADAPTER = "anthropic-messages" @@ -190,6 +191,7 @@ type AnthropicEvent = Schema.Schema.Type interface ParserState { readonly tools: ToolStream.State readonly usage?: Usage + readonly lifecycle: Lifecycle.State } const invalid = ProviderShared.invalidRequest @@ -500,37 +502,45 @@ const onContentBlockStart = (state: ParserState, event: AnthropicEvent): StepRes if (!block) return [state, NO_EVENTS] if ((block.type === "tool_use" || block.type === "server_tool_use") && event.index !== undefined) { + const events: LLMEvent[] = [] + const lifecycle = Lifecycle.stepStart(state.lifecycle, events) return [ { ...state, + lifecycle, tools: ToolStream.start(state.tools, event.index, { id: block.id ?? String(event.index), name: block.name ?? "", providerExecuted: block.type === "server_tool_use", }), }, - NO_EVENTS, + [...events, LLMEvent.toolInputStart({ id: block.id ?? String(event.index), name: block.name ?? "" })], ] } if (block.type === "text" && block.text) { - return [state, [LLMEvent.textDelta({ id: `text-${event.index ?? 0}`, text: block.text })]] + const events: LLMEvent[] = [] + return [ + { ...state, lifecycle: Lifecycle.textDelta(state.lifecycle, events, `text-${event.index ?? 0}`, block.text) }, + events, + ] } if (block.type === "thinking" && block.thinking) { + const events: LLMEvent[] = [] return [ - state, - [ - LLMEvent.reasoningDelta({ - id: `reasoning-${event.index ?? 0}`, - text: block.thinking, - }), - ], + { + ...state, + lifecycle: Lifecycle.reasoningDelta(state.lifecycle, events, `reasoning-${event.index ?? 0}`, block.thinking), + }, + events, ] } const result = serverToolResultEvent(block) - return [state, result ? [result] : NO_EVENTS] + if (!result) return [state, NO_EVENTS] + const events: LLMEvent[] = [] + return [{ ...state, lifecycle: Lifecycle.stepStart(state.lifecycle, events) }, [...events, result]] } const onContentBlockDelta = Effect.fn("AnthropicMessages.onContentBlockDelta")(function* ( @@ -540,25 +550,37 @@ const onContentBlockDelta = Effect.fn("AnthropicMessages.onContentBlockDelta")(f const delta = event.delta if (delta?.type === "text_delta" && delta.text) { - return [state, [LLMEvent.textDelta({ id: `text-${event.index ?? 0}`, text: delta.text })]] satisfies StepResult + const events: LLMEvent[] = [] + return [ + { ...state, lifecycle: Lifecycle.textDelta(state.lifecycle, events, `text-${event.index ?? 0}`, delta.text) }, + events, + ] satisfies StepResult } if (delta?.type === "thinking_delta" && delta.thinking) { + const events: LLMEvent[] = [] return [ - state, - [LLMEvent.reasoningDelta({ id: `reasoning-${event.index ?? 0}`, text: delta.thinking })], + { + ...state, + lifecycle: Lifecycle.reasoningDelta(state.lifecycle, events, `reasoning-${event.index ?? 0}`, delta.thinking), + }, + events, ] satisfies StepResult } if (delta?.type === "signature_delta" && delta.signature) { + const events: LLMEvent[] = [] return [ - state, - [ - LLMEvent.reasoningEnd({ - id: `reasoning-${event.index ?? 0}`, - providerMetadata: anthropicMetadata({ signature: delta.signature }), - }), - ], + { + ...state, + lifecycle: Lifecycle.reasoningEnd( + state.lifecycle, + events, + `reasoning-${event.index ?? 0}`, + anthropicMetadata({ signature: delta.signature }), + ), + }, + events, ] satisfies StepResult } @@ -572,7 +594,10 @@ const onContentBlockDelta = Effect.fn("AnthropicMessages.onContentBlockDelta")(f "Anthropic Messages tool argument delta is missing its tool call", ) if (ToolStream.isError(result)) return yield* result - return [{ ...state, tools: result.tools }, result.event ? [result.event] : NO_EVENTS] satisfies StepResult + const events: LLMEvent[] = [] + const lifecycle = result.events.length ? Lifecycle.stepStart(state.lifecycle, events) : state.lifecycle + events.push(...result.events) + return [{ ...state, lifecycle, tools: result.tools }, events] satisfies StepResult } return [state, NO_EVENTS] satisfies StepResult @@ -584,23 +609,30 @@ const onContentBlockStop = Effect.fn("AnthropicMessages.onContentBlockStop")(fun ) { if (event.index === undefined) return [state, NO_EVENTS] satisfies StepResult const result = yield* ToolStream.finish(ADAPTER, state.tools, event.index) - return [{ ...state, tools: result.tools }, result.event ? [result.event] : NO_EVENTS] satisfies StepResult + const events: LLMEvent[] = [] + const resultEvents = result.events ?? [] + const lifecycle = resultEvents.length + ? Lifecycle.stepStart(state.lifecycle, events) + : Lifecycle.reasoningEnd( + Lifecycle.textEnd(state.lifecycle, events, `text-${event.index}`), + events, + `reasoning-${event.index}`, + ) + events.push(...resultEvents) + return [{ ...state, lifecycle, tools: result.tools }, events] satisfies StepResult }) const onMessageDelta = (state: ParserState, event: AnthropicEvent): StepResult => { const usage = mergeUsage(state.usage, mapUsage(event.usage)) - return [ - { ...state, usage }, - [ - LLMEvent.requestFinish({ - reason: mapFinishReason(event.delta?.stop_reason), - usage, - providerMetadata: event.delta?.stop_sequence - ? anthropicMetadata({ stopSequence: event.delta.stop_sequence }) - : undefined, - }), - ], - ] + const events: LLMEvent[] = [] + const lifecycle = Lifecycle.finish(state.lifecycle, events, { + reason: mapFinishReason(event.delta?.stop_reason), + usage, + providerMetadata: event.delta?.stop_sequence + ? anthropicMetadata({ stopSequence: event.delta.stop_sequence }) + : undefined, + }) + return [{ ...state, lifecycle, usage }, events] } const onError = (state: ParserState, event: AnthropicEvent): StepResult => [ @@ -634,7 +666,7 @@ export const protocol = Protocol.make({ }, stream: { event: Protocol.jsonEvent(AnthropicEvent), - initial: () => ({ tools: ToolStream.empty() }), + initial: () => ({ tools: ToolStream.empty(), lifecycle: Lifecycle.initial() }), step, }, }) diff --git a/packages/llm/src/protocols/bedrock-converse.ts b/packages/llm/src/protocols/bedrock-converse.ts index f561a6d7c5a9..7f5647c4a7be 100644 --- a/packages/llm/src/protocols/bedrock-converse.ts +++ b/packages/llm/src/protocols/bedrock-converse.ts @@ -17,6 +17,7 @@ import { JsonObject, optionalArray, ProviderShared } from "./shared" import { BedrockAuth, type Credentials as BedrockCredentials } from "./utils/bedrock-auth" import { BedrockCache } from "./utils/bedrock-cache" import { BedrockMedia } from "./utils/bedrock-media" +import { Lifecycle } from "./utils/lifecycle" import { ToolStream } from "./utils/tool-stream" const ADAPTER = "bedrock-converse" @@ -420,45 +421,64 @@ interface ParserState { // `metadata` (carries usage). Hold the terminal event in state so `onHalt` // can emit exactly one finish after both chunks have had a chance to arrive. readonly pendingFinish: { readonly reason: FinishReason; readonly usage?: Usage } | undefined + readonly hasToolCalls: boolean + readonly lifecycle: Lifecycle.State } const step = (state: ParserState, event: BedrockEvent) => Effect.gen(function* () { if (event.contentBlockStart?.start?.toolUse) { const index = event.contentBlockStart.contentBlockIndex + const events: LLMEvent[] = [] + const lifecycle = Lifecycle.stepStart(state.lifecycle, events) return [ { ...state, + lifecycle, tools: ToolStream.start(state.tools, index, { id: event.contentBlockStart.start.toolUse.toolUseId, name: event.contentBlockStart.start.toolUse.name, }), }, - [], + [ + ...events, + LLMEvent.toolInputStart({ + id: event.contentBlockStart.start.toolUse.toolUseId, + name: event.contentBlockStart.start.toolUse.name, + }), + ], ] as const } if (event.contentBlockDelta?.delta?.text) { + const events: LLMEvent[] = [] return [ - state, - [ - LLMEvent.textDelta({ - id: `text-${event.contentBlockDelta.contentBlockIndex}`, - text: event.contentBlockDelta.delta.text, - }), - ], + { + ...state, + lifecycle: Lifecycle.textDelta( + state.lifecycle, + events, + `text-${event.contentBlockDelta.contentBlockIndex}`, + event.contentBlockDelta.delta.text, + ), + }, + events, ] as const } if (event.contentBlockDelta?.delta?.reasoningContent?.text) { + const events: LLMEvent[] = [] return [ - state, - [ - LLMEvent.reasoningDelta({ - id: `reasoning-${event.contentBlockDelta.contentBlockIndex}`, - text: event.contentBlockDelta.delta.reasoningContent.text, - }), - ], + { + ...state, + lifecycle: Lifecycle.reasoningDelta( + state.lifecycle, + events, + `reasoning-${event.contentBlockDelta.contentBlockIndex}`, + event.contentBlockDelta.delta.reasoningContent.text, + ), + }, + events, ] as const } @@ -472,12 +492,33 @@ const step = (state: ParserState, event: BedrockEvent) => "Bedrock Converse tool delta is missing its tool call", ) if (ToolStream.isError(result)) return yield* result - return [{ ...state, tools: result.tools }, result.event ? [result.event] : []] as const + const events: LLMEvent[] = [] + const lifecycle = result.events.length ? Lifecycle.stepStart(state.lifecycle, events) : state.lifecycle + events.push(...result.events) + return [{ ...state, lifecycle, tools: result.tools }, events] as const } if (event.contentBlockStop) { const result = yield* ToolStream.finish(ADAPTER, state.tools, event.contentBlockStop.contentBlockIndex) - return [{ ...state, tools: result.tools }, result.event ? [result.event] : []] as const + const events: LLMEvent[] = [] + const resultEvents = result.events ?? [] + const lifecycle = resultEvents.length + ? Lifecycle.stepStart(state.lifecycle, events) + : Lifecycle.reasoningEnd( + Lifecycle.textEnd(state.lifecycle, events, `text-${event.contentBlockStop.contentBlockIndex}`), + events, + `reasoning-${event.contentBlockStop.contentBlockIndex}`, + ) + events.push(...resultEvents) + return [ + { + ...state, + hasToolCalls: resultEvents.some(LLMEvent.is.toolCall) ? true : state.hasToolCalls, + lifecycle, + tools: result.tools, + }, + events, + ] as const } if (event.messageStop) { @@ -517,7 +558,15 @@ const framing = BedrockEventStream.framing(ADAPTER) const onHalt = (state: ParserState): ReadonlyArray => state.pendingFinish - ? [LLMEvent.requestFinish({ reason: state.pendingFinish.reason, usage: state.pendingFinish.usage })] + ? (() => { + const events: LLMEvent[] = [] + Lifecycle.finish(state.lifecycle, events, { + reason: + state.pendingFinish.reason === "stop" && state.hasToolCalls ? "tool-calls" : state.pendingFinish.reason, + usage: state.pendingFinish.usage, + }) + return events + })() : [] // ============================================================================= @@ -535,7 +584,12 @@ export const protocol = Protocol.make({ }, stream: { event: BedrockEvent, - initial: () => ({ tools: ToolStream.empty(), pendingFinish: undefined }), + initial: () => ({ + tools: ToolStream.empty(), + pendingFinish: undefined, + hasToolCalls: false, + lifecycle: Lifecycle.initial(), + }), step, onHalt, }, diff --git a/packages/llm/src/protocols/gemini.ts b/packages/llm/src/protocols/gemini.ts index 0ee88f3beb04..6e0b82abba9a 100644 --- a/packages/llm/src/protocols/gemini.ts +++ b/packages/llm/src/protocols/gemini.ts @@ -16,6 +16,7 @@ import { } from "../schema" import { JsonObject, optionalArray, ProviderShared } from "./shared" import { GeminiToolSchema } from "./utils/gemini-tool-schema" +import { Lifecycle } from "./utils/lifecycle" const ADAPTER = "gemini" export const DEFAULT_BASE_URL = "https://generativelanguage.googleapis.com/v1beta" @@ -134,10 +135,9 @@ interface ParserState { readonly hasToolCalls: boolean readonly nextToolCallId: number readonly usage?: Usage + readonly lifecycle: Lifecycle.State } -const invalid = ProviderShared.invalidRequest - const mediaData = ProviderShared.mediaBytes // ============================================================================= @@ -324,7 +324,14 @@ const mapFinishReason = (finishReason: string | undefined, hasToolCalls: boolean const finish = (state: ParserState): ReadonlyArray => state.finishReason || state.usage - ? [LLMEvent.requestFinish({ reason: mapFinishReason(state.finishReason, state.hasToolCalls), usage: state.usage })] + ? (() => { + const events: LLMEvent[] = [] + Lifecycle.finish(state.lifecycle, events, { + reason: mapFinishReason(state.finishReason, state.hasToolCalls), + usage: state.usage, + }) + return events + })() : [] const step = (state: ParserState, event: GeminiEvent) => { @@ -341,21 +348,21 @@ const step = (state: ParserState, event: GeminiEvent) => { const events: LLMEvent[] = [] let hasToolCalls = nextState.hasToolCalls + let lifecycle = nextState.lifecycle let nextToolCallId = nextState.nextToolCallId for (const part of candidate.content.parts) { if ("text" in part && part.text.length > 0) { - events.push( - part.thought - ? LLMEvent.reasoningDelta({ id: "reasoning-0", text: part.text }) - : LLMEvent.textDelta({ id: "text-0", text: part.text }), - ) + lifecycle = part.thought + ? Lifecycle.reasoningDelta(lifecycle, events, "reasoning-0", part.text) + : Lifecycle.textDelta(lifecycle, events, "text-0", part.text) continue } if ("functionCall" in part) { const input = part.functionCall.args const id = `tool_${nextToolCallId++}` + lifecycle = Lifecycle.stepStart(lifecycle, events) events.push(LLMEvent.toolCall({ id, name: part.functionCall.name, input })) hasToolCalls = true } @@ -365,6 +372,7 @@ const step = (state: ParserState, event: GeminiEvent) => { { ...nextState, hasToolCalls, + lifecycle, nextToolCallId, finishReason: candidate.finishReason ?? nextState.finishReason, }, @@ -388,7 +396,7 @@ export const protocol = Protocol.make({ }, stream: { event: Protocol.jsonEvent(GeminiEvent), - initial: () => ({ hasToolCalls: false, nextToolCallId: 0 }), + initial: () => ({ hasToolCalls: false, nextToolCallId: 0, lifecycle: Lifecycle.initial() }), step, onHalt: finish, }, diff --git a/packages/llm/src/protocols/openai-chat.ts b/packages/llm/src/protocols/openai-chat.ts index 133adb503bcf..470a1473c40b 100644 --- a/packages/llm/src/protocols/openai-chat.ts +++ b/packages/llm/src/protocols/openai-chat.ts @@ -16,6 +16,7 @@ import { } from "../schema" import { isRecord, JsonObject, optionalArray, optionalNull, ProviderShared } from "./shared" import { OpenAIOptions } from "./utils/openai-options" +import { Lifecycle } from "./utils/lifecycle" import { ToolStream } from "./utils/tool-stream" const ADAPTER = "openai-chat" @@ -147,6 +148,7 @@ interface ParserState { readonly toolCallEvents: ReadonlyArray readonly usage?: Usage readonly finishReason?: FinishReason + readonly lifecycle: Lifecycle.State } const invalid = ProviderShared.invalidRequest @@ -321,7 +323,9 @@ const step = (state: ParserState, event: OpenAIChatEvent) => const toolDeltas = delta?.tool_calls ?? [] let tools = state.tools - if (delta?.content) events.push(LLMEvent.textDelta({ id: "text-0", text: delta.content })) + let lifecycle = state.lifecycle + + if (delta?.content) lifecycle = Lifecycle.textDelta(lifecycle, events, "text-0", delta.content) for (const tool of toolDeltas) { const result = ToolStream.appendOrStart( @@ -333,7 +337,8 @@ const step = (state: ParserState, event: OpenAIChatEvent) => ) if (ToolStream.isError(result)) return yield* result tools = result.tools - if (result.event) events.push(result.event) + if (result.events.length) lifecycle = Lifecycle.stepStart(lifecycle, events) + events.push(...result.events) } // Finalize accumulated tool inputs eagerly when finish_reason arrives so @@ -349,15 +354,20 @@ const step = (state: ParserState, event: OpenAIChatEvent) => toolCallEvents: finished?.events ?? state.toolCallEvents, usage, finishReason, + lifecycle, }, events, ] as const }) const finishEvents = (state: ParserState): ReadonlyArray => { + const events: LLMEvent[] = [] const hasToolCalls = state.toolCallEvents.length > 0 const reason = state.finishReason === "stop" && hasToolCalls ? "tool-calls" : state.finishReason - return [...state.toolCallEvents, ...(reason ? [LLMEvent.requestFinish({ reason, usage: state.usage })] : [])] + const lifecycle = state.toolCallEvents.length ? Lifecycle.stepStart(state.lifecycle, events) : state.lifecycle + events.push(...state.toolCallEvents) + if (reason) Lifecycle.finish(lifecycle, events, { reason, usage: state.usage }) + return events } // ============================================================================= @@ -377,7 +387,7 @@ export const protocol = Protocol.make({ }, stream: { event: Protocol.jsonEvent(OpenAIChatEvent), - initial: () => ({ tools: ToolStream.empty(), toolCallEvents: [] }), + initial: () => ({ tools: ToolStream.empty(), toolCallEvents: [], lifecycle: Lifecycle.initial() }), step, onHalt: finishEvents, }, diff --git a/packages/llm/src/protocols/openai-responses.ts b/packages/llm/src/protocols/openai-responses.ts index 035cc07713cc..e31a42cd5a53 100644 --- a/packages/llm/src/protocols/openai-responses.ts +++ b/packages/llm/src/protocols/openai-responses.ts @@ -17,6 +17,7 @@ import { } from "../schema" import { JsonObject, optionalArray, optionalNull, ProviderShared } from "./shared" import { OpenAIOptions } from "./utils/openai-options" +import { Lifecycle } from "./utils/lifecycle" import { ToolStream } from "./utils/tool-stream" const ADAPTER = "openai-responses" @@ -165,6 +166,7 @@ type OpenAIResponsesEvent = Schema.Schema.Type interface ParserState { readonly tools: ToolStream.State readonly hasFunctionCall: boolean + readonly lifecycle: Lifecycle.State } const invalid = ProviderShared.invalidRequest @@ -385,23 +387,32 @@ const TERMINAL_TYPES = new Set(["response.completed", "response.incomplete", "re const onOutputTextDelta = (state: ParserState, event: OpenAIResponsesEvent): StepResult => { if (!event.delta) return [state, NO_EVENTS] - return [state, [LLMEvent.textDelta({ id: event.item_id ?? "text-0", text: event.delta })]] + const events: LLMEvent[] = [] + return [ + { ...state, lifecycle: Lifecycle.textDelta(state.lifecycle, events, event.item_id ?? "text-0", event.delta) }, + events, + ] } const onOutputItemAdded = (state: ParserState, event: OpenAIResponsesEvent): StepResult => { const item = event.item if (item?.type !== "function_call" || !item.id) return [state, NO_EVENTS] + const providerMetadata = openaiMetadata({ itemId: item.id }) + const events: LLMEvent[] = [] + const lifecycle = Lifecycle.stepStart(state.lifecycle, events) return [ { + ...state, + lifecycle, hasFunctionCall: state.hasFunctionCall, tools: ToolStream.start(state.tools, item.id, { id: item.call_id ?? item.id, name: item.name ?? "", input: item.arguments ?? "", - providerMetadata: openaiMetadata({ itemId: item.id }), + providerMetadata, }), }, - NO_EVENTS, + [...events, LLMEvent.toolInputStart({ id: item.call_id ?? item.id, name: item.name ?? "", providerMetadata })], ] } @@ -418,10 +429,10 @@ const onFunctionCallArgumentsDelta = Effect.fn("OpenAIResponses.onFunctionCallAr "OpenAI Responses tool argument delta is missing its tool call", ) if (ToolStream.isError(result)) return yield* result - return [ - { hasFunctionCall: state.hasFunctionCall, tools: result.tools }, - result.event ? [result.event] : NO_EVENTS, - ] satisfies StepResult + const events: LLMEvent[] = [] + const lifecycle = result.events.length ? Lifecycle.stepStart(state.lifecycle, events) : state.lifecycle + events.push(...result.events) + return [{ ...state, lifecycle, tools: result.tools }, events] satisfies StepResult }) const onOutputItemDone = Effect.fn("OpenAIResponses.onOutputItemDone")(function* ( @@ -440,33 +451,46 @@ const onOutputItemDone = Effect.fn("OpenAIResponses.onOutputItemDone")(function* item.arguments === undefined ? yield* ToolStream.finish(ADAPTER, tools, item.id) : yield* ToolStream.finishWithInput(ADAPTER, tools, item.id, item.arguments) + const events: LLMEvent[] = [] + const resultEvents = result.events ?? [] + const lifecycle = resultEvents.length ? Lifecycle.stepStart(state.lifecycle, events) : state.lifecycle + events.push(...resultEvents) return [ - { hasFunctionCall: result.event ? true : state.hasFunctionCall, tools: result.tools }, - result.event ? [result.event] : NO_EVENTS, + { + ...state, + lifecycle, + hasFunctionCall: resultEvents.some(LLMEvent.is.toolCall) ? true : state.hasFunctionCall, + tools: result.tools, + }, + events, ] satisfies StepResult } - if (isHostedToolItem(item)) return [state, hostedToolEvents(item)] satisfies StepResult + if (isHostedToolItem(item)) { + const events: LLMEvent[] = [] + const lifecycle = Lifecycle.stepStart(state.lifecycle, events) + events.push(...hostedToolEvents(item)) + return [{ ...state, lifecycle }, events] satisfies StepResult + } return [state, NO_EVENTS] satisfies StepResult }) -const onResponseFinish = (state: ParserState, event: OpenAIResponsesEvent): StepResult => [ - state, - [ - LLMEvent.requestFinish({ - reason: mapFinishReason(event, state.hasFunctionCall), - usage: mapUsage(event.response?.usage), - providerMetadata: - event.response?.id || event.response?.service_tier - ? openaiMetadata({ - responseId: event.response.id, - serviceTier: event.response.service_tier, - }) - : undefined, - }), - ], -] +const onResponseFinish = (state: ParserState, event: OpenAIResponsesEvent): StepResult => { + const events: LLMEvent[] = [] + const lifecycle = Lifecycle.finish(state.lifecycle, events, { + reason: mapFinishReason(event, state.hasFunctionCall), + usage: mapUsage(event.response?.usage), + providerMetadata: + event.response?.id || event.response?.service_tier + ? openaiMetadata({ + responseId: event.response.id, + serviceTier: event.response.service_tier, + }) + : undefined, + }) + return [{ ...state, lifecycle }, events] +} const onResponseFailed = (state: ParserState, event: OpenAIResponsesEvent): StepResult => [ state, @@ -506,7 +530,7 @@ export const protocol = Protocol.make({ }, stream: { event: Protocol.jsonEvent(OpenAIResponsesEvent), - initial: () => ({ hasFunctionCall: false, tools: ToolStream.empty() }), + initial: () => ({ hasFunctionCall: false, tools: ToolStream.empty(), lifecycle: Lifecycle.initial() }), step, terminal: (event) => TERMINAL_TYPES.has(event.type), }, diff --git a/packages/llm/src/protocols/utils/lifecycle.ts b/packages/llm/src/protocols/utils/lifecycle.ts new file mode 100644 index 000000000000..67039b137a55 --- /dev/null +++ b/packages/llm/src/protocols/utils/lifecycle.ts @@ -0,0 +1,88 @@ +import { LLMEvent, type FinishReason, type ProviderMetadata, type Usage } from "../../schema" + +export interface State { + readonly stepStarted: boolean + readonly text: ReadonlySet + readonly reasoning: ReadonlySet +} + +export const initial = (): State => ({ stepStarted: false, text: new Set(), reasoning: new Set() }) + +export const stepStart = (state: State, events: LLMEvent[]): State => { + if (state.stepStarted) return state + events.push(LLMEvent.stepStart({ index: 0 })) + return { ...state, stepStarted: true } +} + +export const textDelta = (state: State, events: LLMEvent[], id: string, text: string): State => { + const stepped = stepStart(state, events) + if (stepped.text.has(id)) { + events.push(LLMEvent.textDelta({ id, text })) + return stepped + } + events.push(LLMEvent.textStart({ id }), LLMEvent.textDelta({ id, text })) + return { ...stepped, text: new Set([...stepped.text, id]) } +} + +export const reasoningDelta = (state: State, events: LLMEvent[], id: string, text: string): State => { + const stepped = stepStart(state, events) + if (stepped.reasoning.has(id)) { + events.push(LLMEvent.reasoningDelta({ id, text })) + return stepped + } + events.push(LLMEvent.reasoningStart({ id }), LLMEvent.reasoningDelta({ id, text })) + return { ...stepped, reasoning: new Set([...stepped.reasoning, id]) } +} + +export const reasoningEnd = ( + state: State, + events: LLMEvent[], + id: string, + providerMetadata?: ProviderMetadata, +): State => { + if (!state.reasoning.has(id)) return state + const stepped = stepStart(state, events) + events.push(LLMEvent.reasoningEnd({ id, providerMetadata })) + const reasoning = new Set(stepped.reasoning) + reasoning.delete(id) + return { ...stepped, reasoning } +} + +export const textEnd = (state: State, events: LLMEvent[], id: string, providerMetadata?: ProviderMetadata): State => { + if (!state.text.has(id)) return state + const stepped = stepStart(state, events) + events.push(LLMEvent.textEnd({ id, providerMetadata })) + const text = new Set(stepped.text) + text.delete(id) + return { ...stepped, text } +} + +const closeOpenBlocks = (state: State, events: LLMEvent[]): State => { + for (const id of state.reasoning) events.push(LLMEvent.reasoningEnd({ id })) + for (const id of state.text) events.push(LLMEvent.textEnd({ id })) + return { ...state, text: new Set(), reasoning: new Set() } +} + +export const finish = ( + state: State, + events: LLMEvent[], + input: { + readonly reason: FinishReason + readonly usage?: Usage + readonly providerMetadata?: ProviderMetadata + }, +): State => { + const stepped = closeOpenBlocks(stepStart(state, events), events) + events.push( + LLMEvent.stepFinish({ + index: 0, + reason: input.reason, + usage: input.usage, + providerMetadata: input.providerMetadata, + }), + LLMEvent.requestFinish(input), + ) + return { ...stepped, stepStarted: false } +} + +export * as Lifecycle from "./lifecycle" diff --git a/packages/llm/src/protocols/utils/tool-stream.ts b/packages/llm/src/protocols/utils/tool-stream.ts index aa9c70f017b3..8e07a64bfed8 100644 --- a/packages/llm/src/protocols/utils/tool-stream.ts +++ b/packages/llm/src/protocols/utils/tool-stream.ts @@ -1,5 +1,5 @@ import { Effect } from "effect" -import { LLMError, LLMEvent, type ProviderMetadata, type ToolCall, type ToolInputDelta } from "../../schema" +import { LLMError, LLMEvent, type ProviderMetadata, type ToolCall } from "../../schema" import { eventError, parseToolInput, type ToolAccumulator } from "../shared" type StreamKey = string | number @@ -27,13 +27,13 @@ export type State = Partial> /** * Result of adding argument text to one pending tool call. It returns both the * next `tools` state and the updated `tool` because parsers often need the - * current id/name immediately. `event` is present only when new text arrived; - * metadata-only deltas update identity without emitting `tool-input-delta`. + * current id/name immediately. `events` contains lifecycle and delta events + * produced by the append; metadata-only deltas update identity without output. */ export interface AppendOutcome { readonly tools: State readonly tool: PendingTool - readonly event?: ToolInputDelta + readonly events: ReadonlyArray } /** Create empty accumulator state for one provider stream. */ @@ -49,7 +49,14 @@ const withoutTool = (tools: State, key: K): State => return next } -const inputDelta = (tool: PendingTool, text: string): ToolInputDelta => +const inputStart = (tool: PendingTool) => + LLMEvent.toolInputStart({ + id: tool.id, + name: tool.name, + providerMetadata: tool.providerMetadata, + }) + +const inputDelta = (tool: PendingTool, text: string) => LLMEvent.toolInputDelta({ id: tool.id, name: tool.name, @@ -76,11 +83,16 @@ const appendTool = ( key: K, tool: PendingTool, text: string, -): AppendOutcome => ({ - tools: withTool(tools, key, tool), - tool, - event: text.length === 0 ? undefined : inputDelta(tool, text), -}) +): AppendOutcome => { + const events: LLMEvent[] = [] + if (!tools[key]) events.push(inputStart(tool)) + if (text.length > 0) events.push(inputDelta(tool, text)) + return { + tools: withTool(tools, key, tool), + tool, + events, + } +} export const isError = (result: AppendOutcome | LLMError): result is LLMError => result instanceof LLMError @@ -121,7 +133,8 @@ export const appendOrStart = ( providerExecuted: current?.providerExecuted, providerMetadata: current?.providerMetadata, } - if (current && delta.text.length === 0 && current.id === id && current.name === name) return { tools, tool: current } + if (current && delta.text.length === 0 && current.id === id && current.name === name) + return { tools, tool: current, events: [] } return appendTool(tools, key, tool, delta.text) } @@ -139,7 +152,7 @@ export const appendExisting = ( ): AppendOutcome | LLMError => { const current = tools[key] if (!current) return eventError(route, missingToolMessage) - if (text.length === 0) return { tools, tool: current } + if (text.length === 0) return { tools, tool: current, events: [] } return appendTool(tools, key, { ...current, input: `${current.input}${text}` }, text) } @@ -152,7 +165,13 @@ export const finish = (route: string, tools: State, key: Effect.gen(function* () { const tool = tools[key] if (!tool) return { tools } - return { tools: withoutTool(tools, key), event: yield* toolCall(route, tool) } + return { + tools: withoutTool(tools, key), + events: [ + LLMEvent.toolInputEnd({ id: tool.id, name: tool.name, providerMetadata: tool.providerMetadata }), + yield* toolCall(route, tool), + ], + } }) /** @@ -164,7 +183,13 @@ export const finishWithInput = (route: string, tools: State Effect.gen(function* () { const tool = tools[key] if (!tool) return { tools } - return { tools: withoutTool(tools, key), event: yield* toolCall(route, tool, input) } + return { + tools: withoutTool(tools, key), + events: [ + LLMEvent.toolInputEnd({ id: tool.id, name: tool.name, providerMetadata: tool.providerMetadata }), + yield* toolCall(route, tool, input), + ], + } }) /** @@ -179,7 +204,14 @@ export const finishAll = (route: string, tools: State) = ) return { tools: empty(), - events: yield* Effect.forEach(pending, (tool) => toolCall(route, tool)), + events: yield* Effect.forEach(pending, (tool) => + toolCall(route, tool).pipe( + Effect.map((call) => [ + LLMEvent.toolInputEnd({ id: tool.id, name: tool.name, providerMetadata: tool.providerMetadata }), + call, + ]), + ), + ).pipe(Effect.map((events) => events.flat())), } }) diff --git a/packages/llm/src/tool-runtime.ts b/packages/llm/src/tool-runtime.ts index c6e716d45ee3..f46452582703 100644 --- a/packages/llm/src/tool-runtime.ts +++ b/packages/llm/src/tool-runtime.ts @@ -154,8 +154,8 @@ const accumulate = (state: StepState, event: LLMEvent) => { ) return } - if (event.type === "request-finish") { - state.finishReason = event.reason + if (event.type === "step-finish" || event.type === "request-finish") { + state.finishReason = event.reason === "stop" && state.toolCalls.length > 0 ? "tool-calls" : event.reason } } diff --git a/packages/llm/test/provider/anthropic-messages.test.ts b/packages/llm/test/provider/anthropic-messages.test.ts index 0df3541d58de..6417f73c2b00 100644 --- a/packages/llm/test/provider/anthropic-messages.test.ts +++ b/packages/llm/test/provider/anthropic-messages.test.ts @@ -146,24 +146,46 @@ describe("Anthropic Messages route", () => { tools: [{ name: "lookup", description: "Lookup data", inputSchema: { type: "object" } }], }), ).pipe(Effect.provide(fixedResponse(body))) + const usage = new Usage({ + inputTokens: 5, + outputTokens: 1, + nonCachedInputTokens: 5, + cacheReadInputTokens: undefined, + cacheWriteInputTokens: undefined, + totalTokens: 6, + providerMetadata: { anthropic: { input_tokens: 5, output_tokens: 1 } }, + }) expect(response.toolCalls).toEqual([ - { type: "tool-call", id: "call_1", name: "lookup", input: { query: "weather" } }, + { + type: "tool-call", + id: "call_1", + name: "lookup", + input: { query: "weather" }, + providerExecuted: undefined, + providerMetadata: undefined, + }, ]) expect(response.events).toEqual([ + { type: "step-start", index: 0 }, + { type: "tool-input-start", id: "call_1", name: "lookup" }, { type: "tool-input-delta", id: "call_1", name: "lookup", text: '{"query"' }, { type: "tool-input-delta", id: "call_1", name: "lookup", text: ':"weather"}' }, - { type: "tool-call", id: "call_1", name: "lookup", input: { query: "weather" } }, + { type: "tool-input-end", id: "call_1", name: "lookup", providerMetadata: undefined }, + { + type: "tool-call", + id: "call_1", + name: "lookup", + input: { query: "weather" }, + providerExecuted: undefined, + providerMetadata: undefined, + }, + { type: "step-finish", index: 0, reason: "tool-calls", usage, providerMetadata: undefined }, { type: "request-finish", reason: "tool-calls", - usage: new Usage({ - inputTokens: 5, - outputTokens: 1, - nonCachedInputTokens: 5, - totalTokens: 6, - providerMetadata: { anthropic: { input_tokens: 5, output_tokens: 1 } }, - }), + providerMetadata: undefined, + usage, }, ]) }), diff --git a/packages/llm/test/provider/gemini.test.ts b/packages/llm/test/provider/gemini.test.ts index ea4eadc4989f..80c32c58b319 100644 --- a/packages/llm/test/provider/gemini.test.ts +++ b/packages/llm/test/provider/gemini.test.ts @@ -204,30 +204,37 @@ describe("Gemini route", () => { reasoningTokens: 1, totalTokens: 7, }) + const usage = new Usage({ + inputTokens: 5, + outputTokens: 3, + nonCachedInputTokens: 4, + cacheReadInputTokens: 1, + reasoningTokens: 1, + totalTokens: 7, + providerMetadata: { + google: { + promptTokenCount: 5, + candidatesTokenCount: 2, + totalTokenCount: 7, + thoughtsTokenCount: 1, + cachedContentTokenCount: 1, + }, + }, + }) expect(response.events).toEqual([ + { type: "step-start", index: 0 }, + { type: "reasoning-start", id: "reasoning-0" }, { type: "reasoning-delta", id: "reasoning-0", text: "thinking" }, + { type: "text-start", id: "text-0" }, { type: "text-delta", id: "text-0", text: "Hello" }, { type: "text-delta", id: "text-0", text: "!" }, + { type: "reasoning-end", id: "reasoning-0" }, + { type: "text-end", id: "text-0" }, + { type: "step-finish", index: 0, reason: "stop", usage, providerMetadata: undefined }, { type: "request-finish", reason: "stop", - usage: new Usage({ - inputTokens: 5, - outputTokens: 3, - nonCachedInputTokens: 4, - cacheReadInputTokens: 1, - reasoningTokens: 1, - totalTokens: 7, - providerMetadata: { - google: { - promptTokenCount: 5, - candidatesTokenCount: 2, - totalTokenCount: 7, - thoughtsTokenCount: 1, - cachedContentTokenCount: 1, - }, - }, - }), + usage, }, ]) }), @@ -252,22 +259,41 @@ describe("Gemini route", () => { tools: [{ name: "lookup", description: "Lookup data", inputSchema: { type: "object" } }], }), ).pipe(Effect.provide(fixedResponse(body))) + const usage = new Usage({ + inputTokens: 5, + outputTokens: 1, + nonCachedInputTokens: 5, + cacheReadInputTokens: undefined, + reasoningTokens: undefined, + totalTokens: 6, + providerMetadata: { google: { promptTokenCount: 5, candidatesTokenCount: 1 } }, + }) expect(response.toolCalls).toEqual([ - { type: "tool-call", id: "tool_0", name: "lookup", input: { query: "weather" } }, + { + type: "tool-call", + id: "tool_0", + name: "lookup", + input: { query: "weather" }, + providerExecuted: undefined, + providerMetadata: undefined, + }, ]) expect(response.events).toEqual([ - { type: "tool-call", id: "tool_0", name: "lookup", input: { query: "weather" } }, + { type: "step-start", index: 0 }, + { + type: "tool-call", + id: "tool_0", + name: "lookup", + input: { query: "weather" }, + providerExecuted: undefined, + providerMetadata: undefined, + }, + { type: "step-finish", index: 0, reason: "tool-calls", usage, providerMetadata: undefined }, { type: "request-finish", reason: "tool-calls", - usage: new Usage({ - inputTokens: 5, - outputTokens: 1, - nonCachedInputTokens: 5, - totalTokens: 6, - providerMetadata: { google: { promptTokenCount: 5, candidatesTokenCount: 1 } }, - }), + usage, }, ]) }), @@ -318,8 +344,10 @@ describe("Gemini route", () => { ), ) - expect(length.events).toEqual([{ type: "request-finish", reason: "length" }]) - expect(filtered.events).toEqual([{ type: "request-finish", reason: "content-filter" }]) + expect(length.events.map((event) => event.type)).toEqual(["step-start", "step-finish", "request-finish"]) + expect(length.events.at(-1)).toMatchObject({ type: "request-finish", reason: "length" }) + expect(filtered.events.map((event) => event.type)).toEqual(["step-start", "step-finish", "request-finish"]) + expect(filtered.events.at(-1)).toMatchObject({ type: "request-finish", reason: "content-filter" }) }), ) diff --git a/packages/llm/test/provider/openai-chat.test.ts b/packages/llm/test/provider/openai-chat.test.ts index 9c814226396d..115c58849cfe 100644 --- a/packages/llm/test/provider/openai-chat.test.ts +++ b/packages/llm/test/provider/openai-chat.test.ts @@ -222,31 +222,36 @@ describe("OpenAI Chat route", () => { }), ) const response = yield* LLMClient.generate(request).pipe(Effect.provide(fixedResponse(body))) + const usage = new Usage({ + inputTokens: 5, + outputTokens: 2, + nonCachedInputTokens: 4, + cacheReadInputTokens: 1, + reasoningTokens: 0, + totalTokens: 7, + providerMetadata: { + openai: { + prompt_tokens: 5, + completion_tokens: 2, + total_tokens: 7, + prompt_tokens_details: { cached_tokens: 1 }, + completion_tokens_details: { reasoning_tokens: 0 }, + }, + }, + }) expect(response.text).toBe("Hello!") expect(response.events).toEqual([ + { type: "step-start", index: 0 }, + { type: "text-start", id: "text-0" }, { type: "text-delta", id: "text-0", text: "Hello" }, { type: "text-delta", id: "text-0", text: "!" }, + { type: "text-end", id: "text-0" }, + { type: "step-finish", index: 0, reason: "stop", usage, providerMetadata: undefined }, { type: "request-finish", reason: "stop", - usage: new Usage({ - inputTokens: 5, - outputTokens: 2, - nonCachedInputTokens: 4, - cacheReadInputTokens: 1, - reasoningTokens: 0, - totalTokens: 7, - providerMetadata: { - openai: { - prompt_tokens: 5, - completion_tokens: 2, - total_tokens: 7, - prompt_tokens_details: { cached_tokens: 1 }, - completion_tokens_details: { reasoning_tokens: 0 }, - }, - }, - }), + usage, }, ]) }), @@ -269,9 +274,20 @@ describe("OpenAI Chat route", () => { ).pipe(Effect.provide(fixedResponse(body))) expect(response.events).toEqual([ + { type: "step-start", index: 0 }, + { type: "tool-input-start", id: "call_1", name: "lookup", providerMetadata: undefined }, { type: "tool-input-delta", id: "call_1", name: "lookup", text: '{"query"' }, { type: "tool-input-delta", id: "call_1", name: "lookup", text: ':"weather"}' }, - { type: "tool-call", id: "call_1", name: "lookup", input: { query: "weather" } }, + { type: "tool-input-end", id: "call_1", name: "lookup", providerMetadata: undefined }, + { + type: "tool-call", + id: "call_1", + name: "lookup", + input: { query: "weather" }, + providerExecuted: undefined, + providerMetadata: undefined, + }, + { type: "step-finish", index: 0, reason: "tool-calls", usage: undefined, providerMetadata: undefined }, { type: "request-finish", reason: "tool-calls", usage: undefined }, ]) }), @@ -293,6 +309,8 @@ describe("OpenAI Chat route", () => { ).pipe(Effect.provide(fixedResponse(body))) expect(response.events).toEqual([ + { type: "step-start", index: 0 }, + { type: "tool-input-start", id: "call_1", name: "lookup", providerMetadata: undefined }, { type: "tool-input-delta", id: "call_1", name: "lookup", text: '{"query"' }, { type: "tool-input-delta", id: "call_1", name: "lookup", text: ':"weather"}' }, ]) @@ -352,7 +370,7 @@ describe("OpenAI Chat route", () => { const events = Array.from( yield* LLMClient.stream(request).pipe(Stream.take(1), Stream.runCollect, Effect.provide(fixedResponse(body))), ) - expect(events.map((event) => event.type)).toEqual(["text-delta"]) + expect(events.map((event) => event.type)).toEqual(["step-start"]) }), ) }) diff --git a/packages/llm/test/provider/openai-responses.test.ts b/packages/llm/test/provider/openai-responses.test.ts index da9dbd82c22f..8b4469f4edbd 100644 --- a/packages/llm/test/provider/openai-responses.test.ts +++ b/packages/llm/test/provider/openai-responses.test.ts @@ -333,32 +333,43 @@ describe("OpenAI Responses route", () => { }, ) const response = yield* LLMClient.generate(request).pipe(Effect.provide(fixedResponse(body))) + const usage = new Usage({ + inputTokens: 5, + outputTokens: 2, + nonCachedInputTokens: 4, + cacheReadInputTokens: 1, + reasoningTokens: 0, + totalTokens: 7, + providerMetadata: { + openai: { + input_tokens: 5, + output_tokens: 2, + total_tokens: 7, + input_tokens_details: { cached_tokens: 1 }, + output_tokens_details: { reasoning_tokens: 0 }, + }, + }, + }) expect(response.text).toBe("Hello!") expect(response.events).toEqual([ + { type: "step-start", index: 0 }, + { type: "text-start", id: "msg_1" }, { type: "text-delta", id: "msg_1", text: "Hello" }, { type: "text-delta", id: "msg_1", text: "!" }, + { type: "text-end", id: "msg_1" }, + { + type: "step-finish", + index: 0, + reason: "stop", + providerMetadata: { openai: { responseId: "resp_1", serviceTier: "default" } }, + usage, + }, { type: "request-finish", reason: "stop", providerMetadata: { openai: { responseId: "resp_1", serviceTier: "default" } }, - usage: new Usage({ - inputTokens: 5, - outputTokens: 2, - nonCachedInputTokens: 4, - cacheReadInputTokens: 1, - reasoningTokens: 0, - totalTokens: 7, - providerMetadata: { - openai: { - input_tokens: 5, - output_tokens: 2, - total_tokens: 7, - input_tokens_details: { cached_tokens: 1 }, - output_tokens_details: { reasoning_tokens: 0 }, - }, - }, - }), + usage, }, ]) }), @@ -390,8 +401,24 @@ describe("OpenAI Responses route", () => { tools: [{ name: "lookup", description: "Lookup data", inputSchema: { type: "object" } }], }), ).pipe(Effect.provide(fixedResponse(body))) + const usage = new Usage({ + inputTokens: 5, + outputTokens: 1, + nonCachedInputTokens: 5, + cacheReadInputTokens: undefined, + reasoningTokens: undefined, + totalTokens: 6, + providerMetadata: { openai: { input_tokens: 5, output_tokens: 1 } }, + }) expect(response.events).toEqual([ + { type: "step-start", index: 0 }, + { + type: "tool-input-start", + id: "call_1", + name: "lookup", + providerMetadata: { openai: { itemId: "item_1" } }, + }, { type: "tool-input-delta", id: "call_1", @@ -404,23 +431,26 @@ describe("OpenAI Responses route", () => { name: "lookup", text: ':"weather"}', }, + { + type: "tool-input-end", + id: "call_1", + name: "lookup", + providerMetadata: { openai: { itemId: "item_1" } }, + }, { type: "tool-call", id: "call_1", name: "lookup", input: { query: "weather" }, + providerExecuted: undefined, providerMetadata: { openai: { itemId: "item_1" } }, }, + { type: "step-finish", index: 0, reason: "tool-calls", usage, providerMetadata: undefined }, { type: "request-finish", reason: "tool-calls", - usage: new Usage({ - inputTokens: 5, - outputTokens: 1, - nonCachedInputTokens: 5, - totalTokens: 6, - providerMetadata: { openai: { input_tokens: 5, output_tokens: 1 } }, - }), + providerMetadata: undefined, + usage, }, ]) }), diff --git a/packages/llm/test/tool-runtime.test.ts b/packages/llm/test/tool-runtime.test.ts index 8f4221784d5a..040a11fb6824 100644 --- a/packages/llm/test/tool-runtime.test.ts +++ b/packages/llm/test/tool-runtime.test.ts @@ -313,7 +313,14 @@ describe("LLMClient tools", () => { ), ) - expect(events.map((event) => event.type)).toEqual(["text-delta", "request-finish"]) + expect(events.map((event) => event.type)).toEqual([ + "step-start", + "text-start", + "text-delta", + "text-end", + "step-finish", + "request-finish", + ]) expect(LLMResponse.text({ events })).toBe("Done.") }), ) diff --git a/packages/llm/test/tool-stream.test.ts b/packages/llm/test/tool-stream.test.ts index 04a0035c993f..b005d2666c8f 100644 --- a/packages/llm/test/tool-stream.test.ts +++ b/packages/llm/test/tool-stream.test.ts @@ -21,11 +21,17 @@ describe("ToolStream", () => { if (ToolStream.isError(second)) return yield* second const finished = yield* ToolStream.finish(ADAPTER, second.tools, 0) - expect(first.event).toEqual({ type: "tool-input-delta", id: "call_1", name: "lookup", text: '{"query"' }) - expect(second.event).toEqual({ type: "tool-input-delta", id: "call_1", name: "lookup", text: ':"weather"}' }) + expect(first.events).toEqual([ + { type: "tool-input-start", id: "call_1", name: "lookup" }, + { type: "tool-input-delta", id: "call_1", name: "lookup", text: '{"query"' }, + ]) + expect(second.events).toEqual([{ type: "tool-input-delta", id: "call_1", name: "lookup", text: ':"weather"}' }]) expect(finished).toEqual({ tools: {}, - event: { type: "tool-call", id: "call_1", name: "lookup", input: { query: "weather" } }, + events: [ + { type: "tool-input-end", id: "call_1", name: "lookup" }, + { type: "tool-call", id: "call_1", name: "lookup", input: { query: "weather" } }, + ], }) }), ) @@ -50,7 +56,10 @@ describe("ToolStream", () => { expect(finished).toEqual({ tools: {}, - event: { type: "tool-call", id: "call_1", name: "lookup", input: { query: "final" } }, + events: [ + { type: "tool-input-end", id: "call_1", name: "lookup" }, + { type: "tool-call", id: "call_1", name: "lookup", input: { query: "final" } }, + ], }) }), ) @@ -73,7 +82,9 @@ describe("ToolStream", () => { expect(finished).toEqual({ tools: {}, events: [ + { type: "tool-input-end", id: "call_1", name: "lookup" }, { type: "tool-call", id: "call_1", name: "lookup", input: {} }, + { type: "tool-input-end", id: "call_2", name: "web_search" }, { type: "tool-call", id: "call_2",