Skip to content

Commit 84a3a3e

Browse files
authored
feat(provider): add stream core (#22)
1 parent 0c7b13c commit 84a3a3e

5 files changed

Lines changed: 409 additions & 0 deletions

File tree

docs/internals/llm-streams.md

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
# LLM Stream Core
2+
3+
This document describes the first part of the Layer 3 stream pipeline.
4+
5+
The current implementation introduces two building blocks:
6+
7+
- `sse_reader` in `gateway::streams::reader`
8+
- `HubChunkStream` in `gateway::streams::hub`
9+
10+
## Scope
11+
12+
This slice only covers the hub-facing stream foundation.
13+
14+
- `sse_reader` turns a byte stream into complete SSE lines.
15+
- `HubChunkStream` turns provider stream lines into hub `ChatCompletionChunk` values.
16+
17+
`BridgedStream` and `NativeStream` are intentionally deferred to later steps.
18+
19+
## `sse_reader`
20+
21+
`sse_reader` keeps the contract simple: it emits raw SSE lines as strings.
22+
23+
Three details matter:
24+
25+
- it preserves the original line content instead of stripping `data:` prefixes
26+
- it appends a synthetic trailing newline so the last partial line is flushed on EOF
27+
- it drops empty separator lines so downstream transforms only see meaningful records
28+
29+
That behavior matches the current provider transforms, which already parse SSE framing themselves.
30+
31+
## `HubChunkStream`
32+
33+
`HubChunkStream` is the first stream adapter that works on top of provider transforms.
34+
35+
Its polling behavior is deliberately ordered:
36+
37+
1. drain the internal buffer first
38+
2. poll the raw line stream only when the buffer is empty
39+
3. call `ProviderCapabilities::transform_stream_chunk()` on each raw line
40+
4. return the first produced hub chunk immediately and queue the rest
41+
42+
That fixes the earlier class of bug where a provider transform could return multiple chunks for one raw input line and only the first chunk would be observed.
43+
44+
## Usage Accumulation
45+
46+
`HubChunkStream` also centralizes usage tracking.
47+
48+
Whenever a transformed hub chunk carries `usage`, the stream copies `prompt_tokens` and `completion_tokens` into `ChatStreamState`. This keeps token accounting outside individual provider transforms while still making the latest usage totals available to later pipeline stages.
49+
50+
## Stream State
51+
52+
`ChatStreamState` now carries both aggregation data and provider stream metadata.
53+
54+
It currently tracks:
55+
56+
- buffered tool call assembly state
57+
- latest input and output token counts
58+
- streamed response metadata such as `id`, `model`, and `created`
59+
60+
Those metadata fields are required because some providers only emit response identity once at stream start, while later events still need to be converted into well-formed hub chunks.
61+
62+
## Current Limits
63+
64+
This implementation is intentionally narrow.
65+
66+
- only the SSE reader is implemented in this slice
67+
- `JsonArrayStream` and `AwsEventStream` readers are still future work
68+
- no format bridging happens here yet; this stream only produces hub chunks
69+
70+
That keeps the first stream-layer step focused on correctness of buffering, polling order, and usage propagation.

src/gateway/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,6 @@ pub mod error;
22
pub mod formats;
33
pub mod provider_instance;
44
pub mod providers;
5+
pub mod streams;
56
pub mod traits;
67
pub mod types;

src/gateway/streams/hub.rs

Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
use std::{
2+
collections::VecDeque,
3+
pin::Pin,
4+
sync::Arc,
5+
task::{Context, Poll},
6+
};
7+
8+
use futures::Stream;
9+
use pin_project::pin_project;
10+
11+
use crate::gateway::{
12+
error::Result,
13+
traits::{ChatStreamState, ProviderCapabilities},
14+
types::openai::ChatCompletionChunk,
15+
};
16+
17+
/// Buffered hub stream adapter for provider-produced raw stream lines.
18+
///
19+
/// `HubChunkStream` preserves output ordering when one raw input item expands
20+
/// into multiple `ChatCompletionChunk` values. The first transformed chunk is
21+
/// returned immediately and the remaining chunks are queued in `buffer` for
22+
/// subsequent polls.
23+
///
24+
/// The stream mutates `state` as transformed chunks flow through it. In
25+
/// particular, provider-specific stream metadata and the latest observed usage
26+
/// totals are accumulated there so later pipeline stages can inspect them.
27+
/// Provider-specific transformation behavior is delegated to `def`, held as an
28+
/// `Arc<dyn ProviderCapabilities>`.
29+
#[pin_project]
30+
pub struct HubChunkStream {
31+
#[pin]
32+
inner: Pin<Box<dyn Stream<Item = Result<String>> + Send>>,
33+
def: Arc<dyn ProviderCapabilities>,
34+
pub(crate) state: ChatStreamState,
35+
buffer: VecDeque<ChatCompletionChunk>,
36+
}
37+
38+
impl HubChunkStream {
39+
/// Creates a `HubChunkStream` from raw provider stream lines.
40+
///
41+
/// The input stream must preserve line order. The returned stream stays
42+
/// `Send` as long as the input stream is `Send`, and every polled raw line
43+
/// is transformed through the supplied provider definition.
44+
pub fn new(
45+
inner: impl Stream<Item = Result<String>> + Send + 'static,
46+
def: Arc<dyn ProviderCapabilities>,
47+
) -> Self {
48+
Self {
49+
inner: Box::pin(inner),
50+
def,
51+
state: ChatStreamState::default(),
52+
buffer: VecDeque::new(),
53+
}
54+
}
55+
}
56+
57+
impl Stream for HubChunkStream {
58+
type Item = Result<ChatCompletionChunk>;
59+
60+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
61+
let mut this = self.project();
62+
63+
if let Some(chunk) = this.buffer.pop_front() {
64+
return Poll::Ready(Some(Ok(chunk)));
65+
}
66+
67+
loop {
68+
match this.inner.as_mut().poll_next(cx) {
69+
Poll::Ready(Some(Ok(raw))) => {
70+
match this.def.transform_stream_chunk(&raw, this.state) {
71+
Ok(chunks) => {
72+
if chunks.is_empty() {
73+
continue;
74+
}
75+
76+
this.state.chunk_index += chunks.len();
77+
for chunk in &chunks {
78+
if let Some(usage) = &chunk.usage {
79+
this.state.input_tokens = usage.prompt_tokens;
80+
this.state.output_tokens = usage.completion_tokens;
81+
}
82+
}
83+
84+
let mut chunks = VecDeque::from(chunks);
85+
let first = chunks.pop_front().unwrap();
86+
this.buffer.extend(chunks);
87+
return Poll::Ready(Some(Ok(first)));
88+
}
89+
Err(error) => return Poll::Ready(Some(Err(error))),
90+
}
91+
}
92+
Poll::Ready(Some(Err(error))) => return Poll::Ready(Some(Err(error))),
93+
Poll::Ready(None) => return Poll::Ready(None),
94+
Poll::Pending => return Poll::Pending,
95+
}
96+
}
97+
}
98+
}
99+
100+
#[cfg(test)]
101+
mod tests {
102+
use std::sync::Arc;
103+
104+
use futures::StreamExt;
105+
use http::HeaderMap;
106+
107+
use super::HubChunkStream;
108+
use crate::gateway::{
109+
error::Result,
110+
provider_instance::ProviderAuth,
111+
traits::{ChatTransform, ProviderCapabilities, ProviderMeta, StreamReaderKind},
112+
types::openai::{
113+
ChatCompletionChunk, ChatCompletionChunkChoice, ChatCompletionChunkDelta,
114+
ChatCompletionUsage,
115+
},
116+
};
117+
118+
struct DummyProvider;
119+
120+
impl ProviderMeta for DummyProvider {
121+
fn name(&self) -> &'static str {
122+
"dummy"
123+
}
124+
125+
fn default_base_url(&self) -> &'static str {
126+
"https://example.com"
127+
}
128+
129+
fn stream_reader_kind(&self) -> StreamReaderKind {
130+
StreamReaderKind::Sse
131+
}
132+
133+
fn build_auth_headers(&self, _auth: &ProviderAuth) -> Result<HeaderMap> {
134+
Ok(HeaderMap::new())
135+
}
136+
}
137+
138+
impl ChatTransform for DummyProvider {
139+
fn transform_stream_chunk(
140+
&self,
141+
raw: &str,
142+
_state: &mut crate::gateway::traits::ChatStreamState,
143+
) -> Result<Vec<ChatCompletionChunk>> {
144+
match raw {
145+
"data: buffered" => Ok(vec![
146+
chunk_with_content("first", None),
147+
chunk_with_content("second", None),
148+
]),
149+
"data: usage" => Ok(vec![chunk_with_content("usage", Some((7, 11)))]),
150+
_ => Ok(vec![]),
151+
}
152+
}
153+
}
154+
155+
impl ProviderCapabilities for DummyProvider {}
156+
157+
#[tokio::test]
158+
async fn hub_chunk_stream_consumes_buffered_chunks_in_order() {
159+
let raw_stream = futures::stream::iter(vec![Ok("data: buffered".to_string())]);
160+
let mut stream = HubChunkStream::new(raw_stream, Arc::new(DummyProvider));
161+
162+
let first = stream.next().await.unwrap().unwrap();
163+
let second = stream.next().await.unwrap().unwrap();
164+
165+
assert_eq!(first.choices[0].delta.content.as_deref(), Some("first"));
166+
assert_eq!(second.choices[0].delta.content.as_deref(), Some("second"));
167+
assert!(stream.next().await.is_none());
168+
}
169+
170+
#[tokio::test]
171+
async fn hub_chunk_stream_accumulates_usage_from_emitted_chunks() {
172+
let raw_stream = futures::stream::iter(vec![Ok("data: usage".to_string())]);
173+
let mut stream = HubChunkStream::new(raw_stream, Arc::new(DummyProvider));
174+
175+
let chunk = stream.next().await.unwrap().unwrap();
176+
177+
assert_eq!(chunk.usage.as_ref().unwrap().prompt_tokens, 7);
178+
assert_eq!(stream.state.input_tokens, 7);
179+
assert_eq!(stream.state.output_tokens, 11);
180+
assert_eq!(stream.state.chunk_index, 1);
181+
}
182+
183+
fn chunk_with_content(content: &str, usage: Option<(u32, u32)>) -> ChatCompletionChunk {
184+
ChatCompletionChunk {
185+
id: "chatcmpl-test".into(),
186+
object: "chat.completion.chunk".into(),
187+
created: 1,
188+
model: "gpt-test".into(),
189+
choices: vec![ChatCompletionChunkChoice {
190+
index: 0,
191+
delta: ChatCompletionChunkDelta {
192+
role: None,
193+
content: Some(content.into()),
194+
tool_calls: None,
195+
},
196+
finish_reason: None,
197+
}],
198+
usage: usage.map(|(prompt_tokens, completion_tokens)| ChatCompletionUsage {
199+
prompt_tokens,
200+
completion_tokens,
201+
total_tokens: prompt_tokens + completion_tokens,
202+
prompt_tokens_details: None,
203+
completion_tokens_details: None,
204+
}),
205+
system_fingerprint: None,
206+
}
207+
}
208+
}

src/gateway/streams/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pub mod hub;
2+
pub mod reader;
3+
4+
pub use hub::HubChunkStream;
5+
pub use reader::sse_reader;

0 commit comments

Comments
 (0)