Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 67 additions & 35 deletions packages/llm/src/protocols/anthropic-messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
} from "../schema"
import { JsonObject, optionalArray, optionalNull, ProviderShared } from "./shared"
import * as Cache from "./utils/cache"
import { Lifecycle } from "./utils/lifecycle"
import { ToolStream } from "./utils/tool-stream"

const ADAPTER = "anthropic-messages"
Expand Down Expand Up @@ -190,6 +191,7 @@ type AnthropicEvent = Schema.Schema.Type<typeof AnthropicEvent>
interface ParserState {
readonly tools: ToolStream.State<number>
readonly usage?: Usage
readonly lifecycle: Lifecycle.State
}

const invalid = ProviderShared.invalidRequest
Expand Down Expand Up @@ -500,37 +502,45 @@ const onContentBlockStart = (state: ParserState, event: AnthropicEvent): StepRes
if (!block) return [state, NO_EVENTS]

if ((block.type === "tool_use" || block.type === "server_tool_use") && event.index !== undefined) {
const events: LLMEvent[] = []
const lifecycle = Lifecycle.stepStart(state.lifecycle, events)
return [
{
...state,
lifecycle,
tools: ToolStream.start(state.tools, event.index, {
id: block.id ?? String(event.index),
name: block.name ?? "",
providerExecuted: block.type === "server_tool_use",
}),
},
NO_EVENTS,
[...events, LLMEvent.toolInputStart({ id: block.id ?? String(event.index), name: block.name ?? "" })],
]
}

if (block.type === "text" && block.text) {
return [state, [LLMEvent.textDelta({ id: `text-${event.index ?? 0}`, text: block.text })]]
const events: LLMEvent[] = []
return [
{ ...state, lifecycle: Lifecycle.textDelta(state.lifecycle, events, `text-${event.index ?? 0}`, block.text) },
events,
]
}

if (block.type === "thinking" && block.thinking) {
const events: LLMEvent[] = []
return [
state,
[
LLMEvent.reasoningDelta({
id: `reasoning-${event.index ?? 0}`,
text: block.thinking,
}),
],
{
...state,
lifecycle: Lifecycle.reasoningDelta(state.lifecycle, events, `reasoning-${event.index ?? 0}`, block.thinking),
},
events,
]
}

const result = serverToolResultEvent(block)
return [state, result ? [result] : NO_EVENTS]
if (!result) return [state, NO_EVENTS]
const events: LLMEvent[] = []
return [{ ...state, lifecycle: Lifecycle.stepStart(state.lifecycle, events) }, [...events, result]]
}

const onContentBlockDelta = Effect.fn("AnthropicMessages.onContentBlockDelta")(function* (
Expand All @@ -540,25 +550,37 @@ const onContentBlockDelta = Effect.fn("AnthropicMessages.onContentBlockDelta")(f
const delta = event.delta

if (delta?.type === "text_delta" && delta.text) {
return [state, [LLMEvent.textDelta({ id: `text-${event.index ?? 0}`, text: delta.text })]] satisfies StepResult
const events: LLMEvent[] = []
return [
{ ...state, lifecycle: Lifecycle.textDelta(state.lifecycle, events, `text-${event.index ?? 0}`, delta.text) },
events,
] satisfies StepResult
}

if (delta?.type === "thinking_delta" && delta.thinking) {
const events: LLMEvent[] = []
return [
state,
[LLMEvent.reasoningDelta({ id: `reasoning-${event.index ?? 0}`, text: delta.thinking })],
{
...state,
lifecycle: Lifecycle.reasoningDelta(state.lifecycle, events, `reasoning-${event.index ?? 0}`, delta.thinking),
},
events,
] satisfies StepResult
}

if (delta?.type === "signature_delta" && delta.signature) {
const events: LLMEvent[] = []
return [
state,
[
LLMEvent.reasoningEnd({
id: `reasoning-${event.index ?? 0}`,
providerMetadata: anthropicMetadata({ signature: delta.signature }),
}),
],
{
...state,
lifecycle: Lifecycle.reasoningEnd(
state.lifecycle,
events,
`reasoning-${event.index ?? 0}`,
anthropicMetadata({ signature: delta.signature }),
),
},
events,
] satisfies StepResult
}

Expand All @@ -572,7 +594,10 @@ const onContentBlockDelta = Effect.fn("AnthropicMessages.onContentBlockDelta")(f
"Anthropic Messages tool argument delta is missing its tool call",
)
if (ToolStream.isError(result)) return yield* result
return [{ ...state, tools: result.tools }, result.event ? [result.event] : NO_EVENTS] satisfies StepResult
const events: LLMEvent[] = []
const lifecycle = result.events.length ? Lifecycle.stepStart(state.lifecycle, events) : state.lifecycle
events.push(...result.events)
return [{ ...state, lifecycle, tools: result.tools }, events] satisfies StepResult
}

return [state, NO_EVENTS] satisfies StepResult
Expand All @@ -584,23 +609,30 @@ const onContentBlockStop = Effect.fn("AnthropicMessages.onContentBlockStop")(fun
) {
if (event.index === undefined) return [state, NO_EVENTS] satisfies StepResult
const result = yield* ToolStream.finish(ADAPTER, state.tools, event.index)
return [{ ...state, tools: result.tools }, result.event ? [result.event] : NO_EVENTS] satisfies StepResult
const events: LLMEvent[] = []
const resultEvents = result.events ?? []
const lifecycle = resultEvents.length
? Lifecycle.stepStart(state.lifecycle, events)
: Lifecycle.reasoningEnd(
Lifecycle.textEnd(state.lifecycle, events, `text-${event.index}`),
events,
`reasoning-${event.index}`,
)
events.push(...resultEvents)
return [{ ...state, lifecycle, tools: result.tools }, events] satisfies StepResult
})

const onMessageDelta = (state: ParserState, event: AnthropicEvent): StepResult => {
const usage = mergeUsage(state.usage, mapUsage(event.usage))
return [
{ ...state, usage },
[
LLMEvent.requestFinish({
reason: mapFinishReason(event.delta?.stop_reason),
usage,
providerMetadata: event.delta?.stop_sequence
? anthropicMetadata({ stopSequence: event.delta.stop_sequence })
: undefined,
}),
],
]
const events: LLMEvent[] = []
const lifecycle = Lifecycle.finish(state.lifecycle, events, {
reason: mapFinishReason(event.delta?.stop_reason),
usage,
providerMetadata: event.delta?.stop_sequence
? anthropicMetadata({ stopSequence: event.delta.stop_sequence })
: undefined,
})
return [{ ...state, lifecycle, usage }, events]
}

const onError = (state: ParserState, event: AnthropicEvent): StepResult => [
Expand Down Expand Up @@ -634,7 +666,7 @@ export const protocol = Protocol.make({
},
stream: {
event: Protocol.jsonEvent(AnthropicEvent),
initial: () => ({ tools: ToolStream.empty<number>() }),
initial: () => ({ tools: ToolStream.empty<number>(), lifecycle: Lifecycle.initial() }),
step,
},
})
Expand Down
92 changes: 73 additions & 19 deletions packages/llm/src/protocols/bedrock-converse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { JsonObject, optionalArray, ProviderShared } from "./shared"
import { BedrockAuth, type Credentials as BedrockCredentials } from "./utils/bedrock-auth"
import { BedrockCache } from "./utils/bedrock-cache"
import { BedrockMedia } from "./utils/bedrock-media"
import { Lifecycle } from "./utils/lifecycle"
import { ToolStream } from "./utils/tool-stream"

const ADAPTER = "bedrock-converse"
Expand Down Expand Up @@ -420,45 +421,64 @@ interface ParserState {
// `metadata` (carries usage). Hold the terminal event in state so `onHalt`
// can emit exactly one finish after both chunks have had a chance to arrive.
readonly pendingFinish: { readonly reason: FinishReason; readonly usage?: Usage } | undefined
readonly hasToolCalls: boolean
readonly lifecycle: Lifecycle.State
}

const step = (state: ParserState, event: BedrockEvent) =>
Effect.gen(function* () {
if (event.contentBlockStart?.start?.toolUse) {
const index = event.contentBlockStart.contentBlockIndex
const events: LLMEvent[] = []
const lifecycle = Lifecycle.stepStart(state.lifecycle, events)
return [
{
...state,
lifecycle,
tools: ToolStream.start(state.tools, index, {
id: event.contentBlockStart.start.toolUse.toolUseId,
name: event.contentBlockStart.start.toolUse.name,
}),
},
[],
[
...events,
LLMEvent.toolInputStart({
id: event.contentBlockStart.start.toolUse.toolUseId,
name: event.contentBlockStart.start.toolUse.name,
}),
],
] as const
}

if (event.contentBlockDelta?.delta?.text) {
const events: LLMEvent[] = []
return [
state,
[
LLMEvent.textDelta({
id: `text-${event.contentBlockDelta.contentBlockIndex}`,
text: event.contentBlockDelta.delta.text,
}),
],
{
...state,
lifecycle: Lifecycle.textDelta(
state.lifecycle,
events,
`text-${event.contentBlockDelta.contentBlockIndex}`,
event.contentBlockDelta.delta.text,
),
},
events,
] as const
}

if (event.contentBlockDelta?.delta?.reasoningContent?.text) {
const events: LLMEvent[] = []
return [
state,
[
LLMEvent.reasoningDelta({
id: `reasoning-${event.contentBlockDelta.contentBlockIndex}`,
text: event.contentBlockDelta.delta.reasoningContent.text,
}),
],
{
...state,
lifecycle: Lifecycle.reasoningDelta(
state.lifecycle,
events,
`reasoning-${event.contentBlockDelta.contentBlockIndex}`,
event.contentBlockDelta.delta.reasoningContent.text,
),
},
events,
] as const
}

Expand All @@ -472,12 +492,33 @@ const step = (state: ParserState, event: BedrockEvent) =>
"Bedrock Converse tool delta is missing its tool call",
)
if (ToolStream.isError(result)) return yield* result
return [{ ...state, tools: result.tools }, result.event ? [result.event] : []] as const
const events: LLMEvent[] = []
const lifecycle = result.events.length ? Lifecycle.stepStart(state.lifecycle, events) : state.lifecycle
events.push(...result.events)
return [{ ...state, lifecycle, tools: result.tools }, events] as const
}

if (event.contentBlockStop) {
const result = yield* ToolStream.finish(ADAPTER, state.tools, event.contentBlockStop.contentBlockIndex)
return [{ ...state, tools: result.tools }, result.event ? [result.event] : []] as const
const events: LLMEvent[] = []
const resultEvents = result.events ?? []
const lifecycle = resultEvents.length
? Lifecycle.stepStart(state.lifecycle, events)
: Lifecycle.reasoningEnd(
Lifecycle.textEnd(state.lifecycle, events, `text-${event.contentBlockStop.contentBlockIndex}`),
events,
`reasoning-${event.contentBlockStop.contentBlockIndex}`,
)
events.push(...resultEvents)
return [
{
...state,
hasToolCalls: resultEvents.some(LLMEvent.is.toolCall) ? true : state.hasToolCalls,
lifecycle,
tools: result.tools,
},
events,
] as const
}

if (event.messageStop) {
Expand Down Expand Up @@ -517,7 +558,15 @@ const framing = BedrockEventStream.framing(ADAPTER)

const onHalt = (state: ParserState): ReadonlyArray<LLMEvent> =>
state.pendingFinish
? [LLMEvent.requestFinish({ reason: state.pendingFinish.reason, usage: state.pendingFinish.usage })]
? (() => {
const events: LLMEvent[] = []
Lifecycle.finish(state.lifecycle, events, {
reason:
state.pendingFinish.reason === "stop" && state.hasToolCalls ? "tool-calls" : state.pendingFinish.reason,
usage: state.pendingFinish.usage,
})
return events
})()
: []

// =============================================================================
Expand All @@ -535,7 +584,12 @@ export const protocol = Protocol.make({
},
stream: {
event: BedrockEvent,
initial: () => ({ tools: ToolStream.empty<number>(), pendingFinish: undefined }),
initial: () => ({
tools: ToolStream.empty<number>(),
pendingFinish: undefined,
hasToolCalls: false,
lifecycle: Lifecycle.initial(),
}),
step,
onHalt,
},
Expand Down
Loading
Loading