Skip to content

Commit 5e30f1e

Browse files
committed
fix(streaming): typed provider errors and eof completion after terminal finish
1 parent 0fc13fa commit 5e30f1e

41 files changed

Lines changed: 1550 additions & 622 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Sources/AgentRunKit/Core/Streaming/StreamCompletion.swift

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,13 @@ import Foundation
44
public struct StreamCompletionDiagnostics: Sendable, Equatable {
55
public let elapsed: Duration
66
public let eventsObserved: Int
7+
/// Whether the stream ended through the provider's own completion signal rather than completion inferred at EOF.
8+
public let terminalMarkerSeen: Bool
79

8-
public init(elapsed: Duration, eventsObserved: Int) {
10+
public init(elapsed: Duration, eventsObserved: Int, terminalMarkerSeen: Bool) {
911
self.elapsed = elapsed
1012
self.eventsObserved = eventsObserved
13+
self.terminalMarkerSeen = terminalMarkerSeen
1114
}
1215
}
1316

Sources/AgentRunKit/Core/Streaming/StreamProcessor.swift

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ private struct AudioAccumulator {
8484

8585
private struct StreamAccumulation {
8686
let eventFactory: StreamEventFactory
87+
let provider: ProviderIdentifier
8788
let started: ContinuousClock.Instant = .now
8889
var content = ""
8990
var reasoning = ""
@@ -96,6 +97,9 @@ private struct StreamAccumulation {
9697
var yieldedEvent = false
9798
var eventsObserved = 0
9899
var sawFinished = false
100+
/// Every backend except OpenAI chat can complete only through its own terminal marker, so a stream
101+
/// that never reports `.streamClosed` has necessarily seen one; the chat path overrides this at EOF.
102+
var terminalMarkerSeen = true
99103

100104
mutating func apply(
101105
_ input: RunStreamElement,
@@ -155,6 +159,8 @@ private struct StreamAccumulation {
155159
guard let iterationUsage else { return }
156160
totalUsage += iterationUsage
157161
usage = iterationUsage
162+
case let .streamClosed(markerSeen):
163+
terminalMarkerSeen = markerSeen
158164
}
159165
}
160166

@@ -222,14 +228,20 @@ private struct StreamAccumulation {
222228

223229
var diagnostics: StreamFailureDiagnostics {
224230
StreamFailureDiagnostics(
231+
provider: provider,
225232
elapsed: ContinuousClock.now - started,
226233
eventsObserved: eventsObserved,
234+
finishSignalSeen: sawFinished,
227235
lastEvent: nil
228236
)
229237
}
230238

231239
var completionDiagnostics: StreamCompletionDiagnostics {
232-
StreamCompletionDiagnostics(elapsed: ContinuousClock.now - started, eventsObserved: eventsObserved)
240+
StreamCompletionDiagnostics(
241+
elapsed: ContinuousClock.now - started,
242+
eventsObserved: eventsObserved,
243+
terminalMarkerSeen: terminalMarkerSeen
244+
)
233245
}
234246

235247
func streamFailure(reason: MalformedStreamReason) -> AgentError {
@@ -263,7 +275,7 @@ struct StreamProcessor {
263275
requestContext: RequestContext? = nil,
264276
requestMode: RunRequestMode = .auto
265277
) async throws -> StreamIteration {
266-
var state = StreamAccumulation(eventFactory: eventFactory)
278+
var state = StreamAccumulation(eventFactory: eventFactory, provider: client.providerIdentifier)
267279
let eventObserver = requestContext?.onStreamEvent
268280
let completionObserver = requestContext?.onStreamComplete
269281

Sources/AgentRunKit/Documentation.docc/Articles/LLMProviders.md

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,10 +217,23 @@ All clients accept a ``RetryPolicy`` controlling retry behavior on transient fai
217217
| `maxAttempts` | 3 | Total attempts before failing |
218218
| `baseDelay` | 1 second | Initial backoff duration |
219219
| `maxDelay` | 30 seconds | Cap on exponential backoff |
220-
| `streamStallTimeout` | nil | Restarts a stream if no delta arrives within this duration |
220+
| `streamStallTimeout` | nil | Fails a stream with ``StreamFailure/idleTimeout(diagnostics:)`` when no bytes arrive within this duration |
221221

222222
Two static presets: `.default` (3 attempts, 1s base, 30s max) and `.none` (single attempt, no retries).
223223

224+
Retries apply before a stream starts; once a 2xx byte stream is open, failures propagate to the caller as typed ``StreamFailure`` values rather than being retried.
225+
226+
## Stream Termination
227+
228+
Each streaming transport defines exactly which wire conditions end a stream successfully; anything else throws a typed ``StreamFailure``.
229+
230+
- OpenAI-compatible Chat Completions: a `data: [DONE]` sentinel completes the stream immediately. A terminal non-`error` `finish_reason` also marks the turn complete, so a stream that ends at EOF after one is a successful completion (reported through ``StreamCompletionDiagnostics/terminalMarkerSeen``). Frames carrying a top-level `error` payload, or `finish_reason: "error"`, throw ``StreamFailure/providerError(code:message:diagnostics:)`` with the upstream code and message preserved. EOF with no finish signal throws ``StreamFailure/providerTerminationMissing(diagnostics:)``, and a `[DONE]` with no preceding finish signal throws ``StreamFailure/finishedDeltaMissing(diagnostics:)``.
231+
- Anthropic (and Vertex Anthropic): `message_stop` completes the stream; `error` events throw ``StreamFailure/providerError(code:message:diagnostics:)``; EOF before `message_stop` throws ``StreamFailure/providerTerminationMissing(diagnostics:)``.
232+
- Gemini (and Vertex Gemini): a chunk with `finishReason` completes the stream; error envelopes throw ``StreamFailure/providerError(code:message:diagnostics:)``.
233+
- Responses API: `response.completed` and `response.incomplete` complete the stream; `response.failed`, `response.error`, and standalone `error` events throw ``StreamFailure/providerError(code:message:diagnostics:)``.
234+
235+
Every failure carries ``StreamFailureDiagnostics`` identifying the provider, elapsed time, events observed, and whether a finish signal had been seen before the failure.
236+
224237
```swift
225238
let client = OpenAIClient(
226239
apiKey: "sk-...",

Sources/AgentRunKit/LLM/Core/StreamDelta.swift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,6 @@ public enum StreamDelta: Sendable, Equatable {
1111
case audioTranscript(String)
1212
case audioStarted(id: String, expiresAt: Int)
1313
case finished(usage: TokenUsage?)
14+
/// Emitted at most once at stream end; absent from backends that can only complete via their own terminal marker.
15+
case streamClosed(terminalMarkerSeen: Bool)
1416
}

Sources/AgentRunKit/LLM/Providers/Anthropic/AnthropicClient.swift

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ public struct AnthropicClient: LLMClient, Sendable {
141141
urlRequest: urlRequest, session: session, retryPolicy: retryPolicy
142142
)
143143
requestContext?.onResponse?(httpResponse)
144-
return try parseResponse(data)
144+
return try parseResponse(data, provider: providerIdentifier)
145145
}
146146

147147
public func stream(
@@ -411,14 +411,18 @@ extension AnthropicClient {
411411
return try buildJSONPostRequest(url: url, body: request, headers: headerMap)
412412
}
413413

414-
func parseResponse(_ data: Data) throws -> AssistantMessage {
414+
func parseResponse(_ data: Data, provider: ProviderIdentifier) throws -> AssistantMessage {
415415
let response: AnthropicResponse
416416
do {
417417
response = try JSONDecoder().decode(AnthropicResponse.self, from: data)
418418
} catch let decodingError {
419419
if let err = try? JSONDecoder().decode(AnthropicErrorResponse.self, from: data),
420420
err.type == "error" {
421-
throw AgentError.llmError(.other("\(err.error.type): \(err.error.message)"))
421+
throw AgentError.llmError(.providerError(
422+
provider: provider,
423+
code: err.error.type,
424+
message: err.error.message
425+
))
422426
}
423427
throw AgentError.llmError(.decodingFailed(decodingError))
424428
}

Sources/AgentRunKit/LLM/Providers/Anthropic/AnthropicClientStreaming.swift

Lines changed: 17 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@ extension AnthropicClient {
2424

2525
try await processSSEStream(
2626
bytes: bytes,
27+
provider: providerIdentifier,
2728
stallTimeout: retryPolicy.streamStallTimeout
2829
) { event, diagnostics in
2930
try await self.handleSSEEvent(
30-
event, state: state, providerIdentifier: self.providerIdentifier,
31-
diagnostics: diagnostics, continuation: continuation
31+
event, state: state, diagnostics: diagnostics, continuation: continuation
3232
)
3333
}
3434
continuation.finish()
@@ -57,10 +57,11 @@ extension AnthropicClient {
5757

5858
try await processSSEStream(
5959
bytes: bytes,
60+
provider: providerIdentifier,
6061
stallTimeout: retryPolicy.streamStallTimeout
6162
) { event, diagnostics in
6263
try await self.handleSSEEvent(
63-
event, state: state, providerIdentifier: self.providerIdentifier, diagnostics: diagnostics
64+
event, state: state, diagnostics: diagnostics
6465
) { delta in
6566
continuation.yield(.delta(delta))
6667
}
@@ -79,28 +80,23 @@ extension AnthropicClient {
7980
func handleSSEEvent(
8081
_ event: SSEEvent,
8182
state: AnthropicStreamState,
82-
providerIdentifier: ProviderIdentifier = .anthropic,
8383
diagnostics: StreamFailureDiagnostics,
8484
continuation: AsyncThrowingStream<StreamDelta, Error>.Continuation
85-
) async throws -> Bool {
86-
try await handleSSEEvent(
87-
event, state: state, providerIdentifier: providerIdentifier, diagnostics: diagnostics
88-
) { delta in
85+
) async throws -> SSEDisposition {
86+
try await handleSSEEvent(event, state: state, diagnostics: diagnostics) { delta in
8987
continuation.yield(delta)
9088
}
9189
}
9290

9391
func handleSSEEvent(
9492
_ event: SSEEvent,
9593
state: AnthropicStreamState,
96-
providerIdentifier: ProviderIdentifier = .anthropic,
9794
diagnostics: StreamFailureDiagnostics,
9895
yield: @Sendable (StreamDelta) -> Void
99-
) async throws -> Bool {
96+
) async throws -> SSEDisposition {
10097
try await handleSSEPayload(
10198
event.data,
10299
state: state,
103-
providerIdentifier: providerIdentifier,
104100
diagnostics: diagnostics,
105101
yield: yield
106102
)
@@ -109,27 +105,25 @@ extension AnthropicClient {
109105
private func handleSSEPayload(
110106
_ payload: String,
111107
state: AnthropicStreamState,
112-
providerIdentifier: ProviderIdentifier,
113108
diagnostics: StreamFailureDiagnostics,
114109
yield: @Sendable (StreamDelta) -> Void
115-
) async throws -> Bool {
110+
) async throws -> SSEDisposition {
116111
let event = try decodeEvent(AnthropicEventTypeOnly.self, from: Data(payload.utf8))
117-
guard let eventType = event.type else { return false }
112+
guard let eventType = event.type else { return .continue }
118113

119114
return try await dispatchEvent(
120115
eventType, data: Data(payload.utf8),
121-
state: state, providerIdentifier: providerIdentifier, diagnostics: diagnostics, yield: yield
116+
state: state, diagnostics: diagnostics, yield: yield
122117
)
123118
}
124119

125120
private func dispatchEvent(
126121
_ type: AnthropicSSEEvent,
127122
data: Data,
128123
state: AnthropicStreamState,
129-
providerIdentifier: ProviderIdentifier,
130124
diagnostics: StreamFailureDiagnostics,
131125
yield: @Sendable (StreamDelta) -> Void
132-
) async throws -> Bool {
126+
) async throws -> SSEDisposition {
133127
switch type {
134128
case .messageStart:
135129
try await handleMessageStart(data: data, state: state)
@@ -152,11 +146,11 @@ extension AnthropicClient {
152146
case .messageStop:
153147
await state.markCompleted()
154148
await yield(.finished(usage: state.finalUsage()))
155-
return true
149+
return .complete
156150
case .error:
157-
try handleError(data: data, providerIdentifier: providerIdentifier)
151+
try handleError(data: data, diagnostics: diagnostics)
158152
}
159-
return false
153+
return .continue
160154
}
161155

162156
private func handleBlockStart(
@@ -274,12 +268,12 @@ extension AnthropicClient {
274268
await state.setOutputTokens(event.usage?.outputTokens ?? 0)
275269
}
276270

277-
private func handleError(data: Data, providerIdentifier: ProviderIdentifier) throws {
271+
private func handleError(data: Data, diagnostics: StreamFailureDiagnostics) throws {
278272
let event = try decodeEvent(AnthropicStreamErrorEvent.self, from: data)
279273
throw AgentError.llmError(.streamFailed(.providerError(
280-
provider: providerIdentifier,
281274
code: event.error.type,
282-
message: event.error.message
275+
message: event.error.message,
276+
diagnostics: diagnostics
283277
)))
284278
}
285279

Sources/AgentRunKit/LLM/Providers/Gemini/GeminiClient.swift

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public struct GeminiClient: LLMClient, Sendable {
5858
urlRequest: urlRequest, session: session, retryPolicy: retryPolicy
5959
)
6060
requestContext?.onResponse?(httpResponse)
61-
return try parseResponse(data)
61+
return try parseResponse(data, provider: providerIdentifier)
6262
}
6363

6464
public func stream(
@@ -250,9 +250,13 @@ extension GeminiClient {
250250
return str
251251
}
252252

253-
func parseResponse(_ data: Data) throws -> AssistantMessage {
253+
func parseResponse(_ data: Data, provider: ProviderIdentifier) throws -> AssistantMessage {
254254
if let err = try? JSONDecoder().decode(GeminiErrorResponse.self, from: data) {
255-
throw AgentError.llmError(.other("\(err.error.status): \(err.error.message)"))
255+
throw AgentError.llmError(.providerError(
256+
provider: provider,
257+
code: err.error.status,
258+
message: err.error.message
259+
))
256260
}
257261

258262
let response: GeminiResponse

Sources/AgentRunKit/LLM/Providers/Gemini/GeminiClientStreaming.swift

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,11 @@ extension GeminiClient {
2828

2929
try await processSSEStream(
3030
bytes: bytes,
31+
provider: providerIdentifier,
3132
stallTimeout: retryPolicy.streamStallTimeout
32-
) { event, _ in
33+
) { event, diagnostics in
3334
try await self.handleSSEEvent(
34-
event, state: state, providerIdentifier: self.providerIdentifier, continuation: continuation
35+
event, state: state, diagnostics: diagnostics, continuation: continuation
3536
)
3637
}
3738
continuation.finish()
@@ -40,28 +41,28 @@ extension GeminiClient {
4041
func handleSSEEvent(
4142
_ event: SSEEvent,
4243
state: GeminiStreamState,
43-
providerIdentifier: ProviderIdentifier = .gemini,
44+
diagnostics: StreamFailureDiagnostics,
4445
continuation: AsyncThrowingStream<StreamDelta, Error>.Continuation
45-
) async throws -> Bool {
46+
) async throws -> SSEDisposition {
4647
try await handleSSEPayload(
47-
event.data, state: state, providerIdentifier: providerIdentifier, continuation: continuation
48+
event.data, state: state, diagnostics: diagnostics, continuation: continuation
4849
)
4950
}
5051

5152
private func handleSSEPayload(
5253
_ payload: String,
5354
state: GeminiStreamState,
54-
providerIdentifier: ProviderIdentifier,
55+
diagnostics: StreamFailureDiagnostics,
5556
continuation: AsyncThrowingStream<StreamDelta, Error>.Continuation
56-
) async throws -> Bool {
57+
) async throws -> SSEDisposition {
5758
let data = Data(payload.utf8)
5859

5960
if let errorResponse = try? JSONDecoder().decode(GeminiErrorResponse.self, from: data) {
6061
throw AgentError.llmError(
6162
.streamFailed(.providerError(
62-
provider: providerIdentifier,
6363
code: errorResponse.error.status,
64-
message: errorResponse.error.message
64+
message: errorResponse.error.message,
65+
diagnostics: diagnostics
6566
))
6667
)
6768
}
@@ -73,7 +74,7 @@ extension GeminiClient {
7374
throw AgentError.llmError(.decodingFailed(error))
7475
}
7576

76-
guard let candidate = response.candidates?.first else { return false }
77+
guard let candidate = response.candidates?.first else { return .continue }
7778

7879
for part in candidate.content?.parts ?? [] {
7980
if let functionCall = part.functionCall {
@@ -120,10 +121,10 @@ extension GeminiClient {
120121
}
121122

122123
continuation.yield(.finished(usage: response.usageMetadata?.tokenUsage))
123-
return true
124+
return .complete
124125
}
125126

126-
return false
127+
return .continue
127128
}
128129
}
129130

0 commit comments

Comments
 (0)