Skip to content

Commit 971118e

Browse files
authored
feat(provider): add bridged stream and native stream implementations (#23)
1 parent 84a3a3e commit 971118e

7 files changed

Lines changed: 733 additions & 22 deletions

File tree

docs/internals/llm-streams.md

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,24 @@
11
# LLM Stream Core
22

3-
This document describes the first part of the Layer 3 stream pipeline.
3+
This document describes the current Layer 3 stream pipeline.
44

5-
The current implementation introduces two building blocks:
5+
The current implementation introduces four building blocks:
66

77
- `sse_reader` in `gateway::streams::reader`
88
- `HubChunkStream` in `gateway::streams::hub`
9+
- `BridgedStream` in `gateway::streams::bridged`
10+
- `NativeStream` in `gateway::streams::native`
911

1012
## Scope
1113

12-
This slice only covers the hub-facing stream foundation.
14+
This slice now covers both hub-facing and native stream adapters.
1315

1416
- `sse_reader` turns a byte stream into complete SSE lines.
1517
- `HubChunkStream` turns provider stream lines into hub `ChatCompletionChunk` values.
18+
- `BridgedStream` turns hub chunks into a concrete `ChatFormat` stream.
19+
- `NativeStream` bypasses hub chunks and lets a `ChatFormat` decode native provider stream lines directly.
1620

17-
`BridgedStream` and `NativeStream` are intentionally deferred to later steps.
21+
Gateway request execution still sits in a later step, but the reusable stream adapters are now in place.
1822

1923
## `sse_reader`
2024

@@ -41,12 +45,35 @@ Its polling behavior is deliberately ordered:
4145

4246
That fixes the earlier class of bug where a provider transform could return multiple chunks for one raw input line and only the first chunk would be observed.
4347

48+
## `BridgedStream`
49+
50+
`BridgedStream` sits one layer above `HubChunkStream`.
51+
52+
Its behavior mirrors the hub adapter:
53+
54+
1. drain any already buffered format-specific chunks
55+
2. poll `HubChunkStream` only when that buffer is empty
56+
3. call `ChatFormat::from_hub_stream()` for each hub chunk
57+
4. return the first bridged chunk immediately and queue the rest
58+
59+
When the hub stream ends, `BridgedStream` also calls `ChatFormat::stream_end_events()` so formats can emit explicit terminators such as final SSE events.
60+
61+
## `NativeStream`
62+
63+
`NativeStream` is the direct counterpart for native-format paths.
64+
65+
Instead of going through hub `ChatCompletionChunk` values, it passes each raw provider stream line to `ChatFormat::transform_native_stream_chunk()`. Buffering rules are the same: if one input line expands into multiple output items, the adapter returns the first one immediately and preserves the rest for later polls.
66+
4467
## Usage Accumulation
4568

46-
`HubChunkStream` also centralizes usage tracking.
69+
`HubChunkStream` still centralizes hub-token tracking.
4770

4871
Whenever a transformed hub chunk carries `usage`, the stream copies `prompt_tokens` and `completion_tokens` into `ChatStreamState`. This keeps token accounting outside individual provider transforms while still making the latest usage totals available to later pipeline stages.
4972

73+
`BridgedStream` reports those latest hub totals through a oneshot channel on both normal completion and premature drop. It only fills fields that were actually observed in the hub stream, and it derives `total_tokens` when both sides are known.
74+
75+
`NativeStream` exposes the same completion and drop hook through `ChatFormat::native_usage()`. Formats that do not override that hook still send an empty `Usage` value, but native-capable formats can now report their own accumulated usage snapshot without coupling the generic stream adapter to any one state shape.
76+
5077
## Stream State
5178

5279
`ChatStreamState` now carries both aggregation data and provider stream metadata.
@@ -63,8 +90,9 @@ Those metadata fields are required because some providers only emit response ide
6390

6491
This implementation is intentionally narrow.
6592

66-
- only the SSE reader is implemented in this slice
93+
- only the SSE reader kind is implemented in this slice
6794
- `JsonArrayStream` and `AwsEventStream` readers are still future work
68-
- no format bridging happens here yet; this stream only produces hub chunks
95+
- the legacy providers under `src/providers/` still keep their own SSE splitting logic
96+
- no production native format has started overriding `ChatFormat::native_usage()` yet
6997

70-
That keeps the first stream-layer step focused on correctness of buffering, polling order, and usage propagation.
98+
That keeps the stream-layer work focused on buffering correctness, polling order, and handoff between provider, hub, and format-specific stream representations.

src/gateway/providers/anthropic/transform.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ pub(crate) fn parse_anthropic_sse_to_openai(
108108
state.response_id = Some(message.id.clone());
109109
state.response_model = Some(message.model.clone());
110110
state.response_created = Some(now_unix_secs());
111-
state.input_tokens = message.usage.input_tokens;
111+
state.input_tokens = Some(message.usage.input_tokens);
112112

113113
Ok(vec![ChatCompletionChunk {
114114
id: message.id,
@@ -206,11 +206,18 @@ pub(crate) fn parse_anthropic_sse_to_openai(
206206
}
207207
},
208208
AnthropicStreamEvent::MessageDelta { delta, usage } => {
209-
state.output_tokens = usage.output_tokens;
209+
state.output_tokens = Some(usage.output_tokens);
210210
if usage.input_tokens > 0 {
211-
state.input_tokens = usage.input_tokens;
211+
state.input_tokens = Some(usage.input_tokens);
212212
}
213213

214+
let usage = match (state.input_tokens, state.output_tokens) {
215+
(Some(input_tokens), Some(output_tokens)) => {
216+
Some(stream_usage_to_openai_usage(input_tokens, output_tokens))
217+
}
218+
_ => None,
219+
};
220+
214221
Ok(vec![build_stream_chunk(
215222
state,
216223
ChatCompletionChunkDelta {
@@ -219,10 +226,7 @@ pub(crate) fn parse_anthropic_sse_to_openai(
219226
tool_calls: None,
220227
},
221228
map_anthropic_stop_reason(delta.stop_reason.as_deref()),
222-
Some(stream_usage_to_openai_usage(
223-
state.input_tokens,
224-
state.output_tokens,
225-
)),
229+
usage,
226230
)?])
227231
}
228232
AnthropicStreamEvent::ContentBlockStop { .. }

0 commit comments

Comments
 (0)