Skip to content

Commit ce061bf

Browse files
authored
Add explicit LLM stream lifecycle events (#26722)
1 parent 3b8790e commit ce061bf

16 files changed

Lines changed: 200 additions & 125 deletions

packages/llm/example/tutorial.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ const FakeProtocol = Protocol.make<FakeBody, string, string, void>({
184184
stream: {
185185
event: Schema.String,
186186
initial: () => undefined,
187-
step: (_, frame) => Effect.succeed([undefined, [{ type: "text-delta", text: frame }]] as const),
187+
step: (_, frame) => Effect.succeed([undefined, [{ type: "text-delta", id: "text-0", text: frame }]] as const),
188188
onHalt: () => [{ type: "request-finish", reason: "stop" }],
189189
},
190190
})

packages/llm/src/protocols/anthropic-messages.ts

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@ import { Endpoint } from "../route/endpoint"
55
import { Framing } from "../route/framing"
66
import { Protocol } from "../route/protocol"
77
import {
8+
LLMEvent,
89
Usage,
910
type CacheHint,
1011
type FinishReason,
11-
type LLMEvent,
1212
type LLMRequest,
1313
type ProviderMetadata,
1414
type ToolCallPart,
@@ -415,14 +415,13 @@ const serverToolResultEvent = (block: NonNullable<AnthropicEvent["content_block"
415415
? String((block.content as Record<string, unknown>).type)
416416
: ""
417417
const isError = errorPayload.endsWith("_tool_result_error")
418-
return {
419-
type: "tool-result",
418+
return LLMEvent.toolResult({
420419
id: block.tool_use_id ?? "",
421420
name: SERVER_TOOL_RESULT_NAMES[block.type],
422421
result: isError ? { type: "error", value: block.content } : { type: "json", value: block.content },
423422
providerExecuted: true,
424423
providerMetadata: anthropicMetadata({ blockType: block.type }),
425-
}
424+
})
426425
}
427426

428427
type StepResult = readonly [ParserState, ReadonlyArray<LLMEvent>]
@@ -453,18 +452,17 @@ const onContentBlockStart = (state: ParserState, event: AnthropicEvent): StepRes
453452
}
454453

455454
if (block.type === "text" && block.text) {
456-
return [state, [{ type: "text-delta", text: block.text }]]
455+
return [state, [LLMEvent.textDelta({ id: `text-${event.index ?? 0}`, text: block.text })]]
457456
}
458457

459458
if (block.type === "thinking" && block.thinking) {
460459
return [
461460
state,
462461
[
463-
{
464-
type: "reasoning-delta",
462+
LLMEvent.reasoningDelta({
463+
id: `reasoning-${event.index ?? 0}`,
465464
text: block.thinking,
466-
...(block.signature ? { providerMetadata: anthropicMetadata({ signature: block.signature }) } : {}),
467-
},
465+
}),
468466
],
469467
]
470468
}
@@ -480,17 +478,17 @@ const onContentBlockDelta = Effect.fn("AnthropicMessages.onContentBlockDelta")(f
480478
const delta = event.delta
481479

482480
if (delta?.type === "text_delta" && delta.text) {
483-
return [state, [{ type: "text-delta", text: delta.text }]] satisfies StepResult
481+
return [state, [LLMEvent.textDelta({ id: `text-${event.index ?? 0}`, text: delta.text })]] satisfies StepResult
484482
}
485483

486484
if (delta?.type === "thinking_delta" && delta.thinking) {
487-
return [state, [{ type: "reasoning-delta", text: delta.thinking }]] satisfies StepResult
485+
return [state, [LLMEvent.reasoningDelta({ id: `reasoning-${event.index ?? 0}`, text: delta.thinking })]] satisfies StepResult
488486
}
489487

490488
if (delta?.type === "signature_delta" && delta.signature) {
491489
return [
492490
state,
493-
[{ type: "reasoning-delta", text: "", providerMetadata: anthropicMetadata({ signature: delta.signature }) }],
491+
[LLMEvent.reasoningEnd({ id: `reasoning-${event.index ?? 0}`, providerMetadata: anthropicMetadata({ signature: delta.signature }) })],
494492
] satisfies StepResult
495493
}
496494

@@ -524,21 +522,20 @@ const onMessageDelta = (state: ParserState, event: AnthropicEvent): StepResult =
524522
return [
525523
{ ...state, usage },
526524
[
527-
{
528-
type: "request-finish",
525+
LLMEvent.requestFinish({
529526
reason: mapFinishReason(event.delta?.stop_reason),
530527
usage,
531-
...(event.delta?.stop_sequence
532-
? { providerMetadata: anthropicMetadata({ stopSequence: event.delta.stop_sequence }) }
533-
: {}),
534-
},
528+
providerMetadata: event.delta?.stop_sequence
529+
? anthropicMetadata({ stopSequence: event.delta.stop_sequence })
530+
: undefined,
531+
}),
535532
],
536533
]
537534
}
538535

539536
const onError = (state: ParserState, event: AnthropicEvent): StepResult => [
540537
state,
541-
[{ type: "provider-error", message: event.error?.message ?? "Anthropic Messages stream error" }],
538+
[LLMEvent.providerError({ message: event.error?.message ?? "Anthropic Messages stream error" })],
542539
]
543540

544541
const step = (state: ParserState, event: AnthropicEvent) => {

packages/llm/src/protocols/bedrock-converse.ts

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@ import { Route, type RouteModelInput } from "../route/client"
33
import { Endpoint } from "../route/endpoint"
44
import { Protocol } from "../route/protocol"
55
import {
6+
LLMEvent,
67
Usage,
78
type CacheHint,
89
type FinishReason,
9-
type LLMEvent,
1010
type LLMRequest,
1111
type ToolCallPart,
1212
type ToolDefinition,
@@ -400,13 +400,26 @@ const step = (state: ParserState, event: BedrockEvent) =>
400400
}
401401

402402
if (event.contentBlockDelta?.delta?.text) {
403-
return [state, [{ type: "text-delta" as const, text: event.contentBlockDelta.delta.text }]] as const
403+
return [
404+
state,
405+
[
406+
LLMEvent.textDelta({
407+
id: `text-${event.contentBlockDelta.contentBlockIndex}`,
408+
text: event.contentBlockDelta.delta.text,
409+
}),
410+
],
411+
] as const
404412
}
405413

406414
if (event.contentBlockDelta?.delta?.reasoningContent?.text) {
407415
return [
408416
state,
409-
[{ type: "reasoning-delta" as const, text: event.contentBlockDelta.delta.reasoningContent.text }],
417+
[
418+
LLMEvent.reasoningDelta({
419+
id: `reasoning-${event.contentBlockDelta.contentBlockIndex}`,
420+
text: event.contentBlockDelta.delta.reasoningContent.text,
421+
}),
422+
],
410423
] as const
411424
}
412425

@@ -449,15 +462,15 @@ const step = (state: ParserState, event: BedrockEvent) =>
449462
event.modelStreamErrorException?.message ??
450463
event.serviceUnavailableException?.message ??
451464
"Bedrock Converse stream error"
452-
return [state, [{ type: "provider-error" as const, message, retryable: true }]] as const
465+
return [state, [LLMEvent.providerError({ message, retryable: true })]] as const
453466
}
454467

455468
if (event.validationException || event.throttlingException) {
456469
const message =
457470
event.validationException?.message ?? event.throttlingException?.message ?? "Bedrock Converse error"
458471
return [
459472
state,
460-
[{ type: "provider-error" as const, message, retryable: event.throttlingException !== undefined }],
473+
[LLMEvent.providerError({ message, retryable: event.throttlingException !== undefined })],
461474
] as const
462475
}
463476

@@ -468,7 +481,7 @@ const framing = BedrockEventStream.framing(ADAPTER)
468481

469482
const onHalt = (state: ParserState): ReadonlyArray<LLMEvent> =>
470483
state.pendingFinish
471-
? [{ type: "request-finish", reason: state.pendingFinish.reason, usage: state.pendingFinish.usage }]
484+
? [LLMEvent.requestFinish({ reason: state.pendingFinish.reason, usage: state.pendingFinish.usage })]
472485
: []
473486

474487
// =============================================================================

packages/llm/src/protocols/gemini.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@ import { Endpoint } from "../route/endpoint"
55
import { Framing } from "../route/framing"
66
import { Protocol } from "../route/protocol"
77
import {
8+
LLMEvent,
89
Usage,
910
type FinishReason,
10-
type LLMEvent,
1111
type LLMRequest,
1212
type MediaPart,
1313
type TextPart,
@@ -311,7 +311,7 @@ const mapFinishReason = (finishReason: string | undefined, hasToolCalls: boolean
311311

312312
const finish = (state: ParserState): ReadonlyArray<LLMEvent> =>
313313
state.finishReason || state.usage
314-
? [{ type: "request-finish", reason: mapFinishReason(state.finishReason, state.hasToolCalls), usage: state.usage }]
314+
? [LLMEvent.requestFinish({ reason: mapFinishReason(state.finishReason, state.hasToolCalls), usage: state.usage })]
315315
: []
316316

317317
const step = (state: ParserState, event: GeminiEvent) => {
@@ -332,14 +332,18 @@ const step = (state: ParserState, event: GeminiEvent) => {
332332

333333
for (const part of candidate.content.parts) {
334334
if ("text" in part && part.text.length > 0) {
335-
events.push({ type: part.thought ? "reasoning-delta" : "text-delta", text: part.text })
335+
events.push(
336+
part.thought
337+
? LLMEvent.reasoningDelta({ id: "reasoning-0", text: part.text })
338+
: LLMEvent.textDelta({ id: "text-0", text: part.text }),
339+
)
336340
continue
337341
}
338342

339343
if ("functionCall" in part) {
340344
const input = part.functionCall.args
341345
const id = `tool_${nextToolCallId++}`
342-
events.push({ type: "tool-call", id, name: part.functionCall.name, input })
346+
events.push(LLMEvent.toolCall({ id, name: part.functionCall.name, input }))
343347
hasToolCalls = true
344348
}
345349
}

packages/llm/src/protocols/openai-chat.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ import { Framing } from "../route/framing"
66
import { HttpTransport } from "../route/transport"
77
import { Protocol } from "../route/protocol"
88
import {
9+
LLMEvent,
910
Usage,
1011
type FinishReason,
11-
type LLMEvent,
1212
type LLMRequest,
1313
type TextPart,
1414
type ToolCallPart,
@@ -312,7 +312,7 @@ const step = (state: ParserState, event: OpenAIChatEvent) =>
312312
const toolDeltas = delta?.tool_calls ?? []
313313
let tools = state.tools
314314

315-
if (delta?.content) events.push({ type: "text-delta", text: delta.content })
315+
if (delta?.content) events.push(LLMEvent.textDelta({ id: "text-0", text: delta.content }))
316316

317317
for (const tool of toolDeltas) {
318318
const result = ToolStream.appendOrStart(
@@ -350,7 +350,7 @@ const finishEvents = (state: ParserState): ReadonlyArray<LLMEvent> => {
350350
const reason = state.finishReason === "stop" && hasToolCalls ? "tool-calls" : state.finishReason
351351
return [
352352
...state.toolCallEvents,
353-
...(reason ? ([{ type: "request-finish", reason, usage: state.usage }] satisfies ReadonlyArray<LLMEvent>) : []),
353+
...(reason ? [LLMEvent.requestFinish({ reason, usage: state.usage })] : []),
354354
]
355355
}
356356

packages/llm/src/protocols/openai-responses.ts

Lines changed: 15 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ import { Framing } from "../route/framing"
66
import { HttpTransport, WebSocketTransport } from "../route/transport"
77
import { Protocol } from "../route/protocol"
88
import {
9+
LLMEvent,
910
Usage,
1011
type FinishReason,
11-
type LLMEvent,
1212
type LLMRequest,
1313
type ProviderMetadata,
1414
type TextPart,
@@ -348,22 +348,20 @@ const hostedToolEvents = (
348348
const tool = HOSTED_TOOLS[item.type]
349349
const providerMetadata = openaiMetadata({ itemId: item.id })
350350
return [
351-
{
352-
type: "tool-call",
351+
LLMEvent.toolCall({
353352
id: item.id,
354353
name: tool.name,
355354
input: tool.input(item),
356355
providerExecuted: true,
357356
providerMetadata,
358-
},
359-
{
360-
type: "tool-result",
357+
}),
358+
LLMEvent.toolResult({
361359
id: item.id,
362360
name: tool.name,
363361
result: hostedToolResult(item),
364362
providerExecuted: true,
365363
providerMetadata,
366-
},
364+
}),
367365
]
368366
}
369367

@@ -382,12 +380,7 @@ const onOutputTextDelta = (state: ParserState, event: OpenAIResponsesEvent): Ste
382380
return [
383381
state,
384382
[
385-
{
386-
type: "text-delta",
387-
id: event.item_id,
388-
text: event.delta,
389-
...(event.item_id ? { providerMetadata: openaiMetadata({ itemId: event.item_id }) } : {}),
390-
},
383+
LLMEvent.textDelta({ id: event.item_id ?? "text-0", text: event.delta }),
391384
],
392385
]
393386
}
@@ -458,30 +451,28 @@ const onOutputItemDone = Effect.fn("OpenAIResponses.onOutputItemDone")(function*
458451
const onResponseFinish = (state: ParserState, event: OpenAIResponsesEvent): StepResult => [
459452
state,
460453
[
461-
{
462-
type: "request-finish",
454+
LLMEvent.requestFinish({
463455
reason: mapFinishReason(event, state.hasFunctionCall),
464456
usage: mapUsage(event.response?.usage),
465-
...(event.response?.id || event.response?.service_tier
466-
? {
467-
providerMetadata: openaiMetadata({
457+
providerMetadata:
458+
event.response?.id || event.response?.service_tier
459+
? openaiMetadata({
468460
responseId: event.response.id,
469461
serviceTier: event.response.service_tier,
470-
}),
471-
}
472-
: {}),
473-
},
462+
})
463+
: undefined,
464+
}),
474465
],
475466
]
476467

477468
const onResponseFailed = (state: ParserState, event: OpenAIResponsesEvent): StepResult => [
478469
state,
479-
[{ type: "provider-error", message: event.message ?? event.code ?? "OpenAI Responses response failed" }],
470+
[LLMEvent.providerError({ message: event.message ?? event.code ?? "OpenAI Responses response failed" })],
480471
]
481472

482473
const onError = (state: ParserState, event: OpenAIResponsesEvent): StepResult => [
483474
state,
484-
[{ type: "provider-error", message: event.message ?? event.code ?? "OpenAI Responses stream error" }],
475+
[LLMEvent.providerError({ message: event.message ?? event.code ?? "OpenAI Responses stream error" })],
485476
]
486477

487478
const step = (state: ParserState, event: OpenAIResponsesEvent) => {

packages/llm/src/protocols/utils/tool-stream.ts

Lines changed: 14 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { Effect } from "effect"
2-
import { LLMError, type ProviderMetadata, type ToolCall, type ToolInputDelta } from "../../schema"
2+
import { LLMError, LLMEvent, type ProviderMetadata, type ToolCall, type ToolInputDelta } from "../../schema"
33
import { eventError, parseToolInput, type ToolAccumulator } from "../shared"
44

55
type StreamKey = string | number
@@ -49,34 +49,24 @@ const withoutTool = <K extends StreamKey>(tools: State<K>, key: K): State<K> =>
4949
return next
5050
}
5151

52-
const inputDelta = (tool: PendingTool, text: string): ToolInputDelta => ({
53-
type: "tool-input-delta",
54-
id: tool.id,
55-
name: tool.name,
56-
text,
57-
...(tool.providerMetadata ? { providerMetadata: tool.providerMetadata } : {}),
58-
})
52+
const inputDelta = (tool: PendingTool, text: string): ToolInputDelta =>
53+
LLMEvent.toolInputDelta({
54+
id: tool.id,
55+
name: tool.name,
56+
text,
57+
})
5958

6059
const toolCall = (route: string, tool: PendingTool, inputOverride?: string) =>
6160
parseToolInput(route, tool.name, inputOverride ?? tool.input).pipe(
6261
Effect.map(
6362
(input): ToolCall =>
64-
tool.providerExecuted
65-
? {
66-
type: "tool-call",
67-
id: tool.id,
68-
name: tool.name,
69-
input,
70-
providerExecuted: true,
71-
...(tool.providerMetadata ? { providerMetadata: tool.providerMetadata } : {}),
72-
}
73-
: {
74-
type: "tool-call",
75-
id: tool.id,
76-
name: tool.name,
77-
input,
78-
...(tool.providerMetadata ? { providerMetadata: tool.providerMetadata } : {}),
79-
},
63+
LLMEvent.toolCall({
64+
id: tool.id,
65+
name: tool.name,
66+
input,
67+
providerExecuted: tool.providerExecuted ? true : undefined,
68+
providerMetadata: tool.providerMetadata,
69+
}),
8070
),
8171
)
8272

0 commit comments

Comments
 (0)