Skip to content

Commit 5e9b420

Browse files
Skobeltsynclaude
andcommitted
feat(#1743): OpenAiClient native chatStream — SSE with [DONE] terminator
Third native streaming adapter. Closes the trio (Ollama / Claude / OpenAI). OpenAI's SSE is simpler than Anthropic's — all events are data:-only (no event: names), terminated by the literal data: [DONE]. OpenAI quirks handled: - Tool call id arrives in the FIRST delta for a tool_calls[].index only; subsequent deltas correlate by index. Aggregator caches index -> id after first sighting. - Arguments are concatenated string fragments across deltas; we parse the accumulated buffer at finish_reason == "tool_calls". - Token usage requires opt-in via stream_options.include_usage: true. OpenAI then sends a final usage-only delta with choices: [] and usage object. The End chunk carries it. Implementation mirrors ClaudeClient's shape: chatStream override + sendChatStream test seam + buildRequestJson(stream: Boolean = false) parameter, flowOn(Dispatchers.IO). TDD red-first: two non-live tests with hardcoded SSE (text-only, tool-call with chunked arguments). Plus a live integration test: OpenAiClientChatStreamLiveTest: model=gpt-4o-mini chunks=19 firstMs=3199 lastMs=3401 gapMs=202 assembled="1 2 3 4 5 6 7 8 9 10" All three providers now stream natively at the wire level: - Ollama: 19 chunks / 84ms gap (NDJSON) - Claude: 2 chunks / 27ms gap (SSE, fastest) - OpenAI: 19 chunks / 202ms gap (SSE) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 340c0d1 commit 5e9b420

3 files changed

Lines changed: 310 additions & 2 deletions

File tree

src/main/kotlin/agents_engine/model/OpenAiClient.kt

Lines changed: 140 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,20 @@ package agents_engine.model
22

33
import agents_engine.generation.LenientJsonParser
44
import agents_engine.generation.jsonSchema
5+
import java.io.BufferedReader
6+
import java.io.InputStream
7+
import java.io.InputStreamReader
58
import java.net.URI
69
import java.net.http.HttpClient
710
import java.net.http.HttpRequest
811
import java.net.http.HttpResponse
912
import kotlin.time.Duration
1013
import kotlin.time.Duration.Companion.seconds
1114
import kotlin.time.toJavaDuration
15+
import kotlinx.coroutines.Dispatchers
16+
import kotlinx.coroutines.flow.Flow
17+
import kotlinx.coroutines.flow.flow
18+
import kotlinx.coroutines.flow.flowOn
1219

1320
/**
1421
* OpenAI Chat Completions adapter (#1656). Mirrors [OllamaClient] and
@@ -59,6 +66,134 @@ open class OpenAiClient(
5966
return parseResponse(responseBody)
6067
}
6168

69+
/**
70+
* #1743 — native SSE streaming. OpenAI's protocol is `data:`-only
71+
* (no `event:` names), terminated by the literal `data: [DONE]`.
72+
*
73+
* Tool-call correlation: the `id` (`call_*`) arrives in the FIRST
74+
* delta for a given `tool_calls[].index`; subsequent deltas omit
75+
* it. The aggregator caches `index -> id` after first sighting.
76+
*
77+
* Arguments arrive as concatenated string fragments. We emit
78+
* `LlmChunk.ToolCallArgumentsDelta` per non-empty fragment and
79+
* accumulate into a buffer; on `finish_reason: "tool_calls"` we
80+
* parse the buffer and emit `LlmChunk.ToolCallFinished`.
81+
*
82+
* Token usage requires `stream_options.include_usage: true` (set in
83+
* `buildRequestJson(stream=true)`). OpenAI then sends a final
84+
* usage-only delta with `choices: []` and `usage: {...}`. We capture
85+
* it and emit `LlmChunk.End(usage)` when `[DONE]` arrives.
86+
*/
87+
override suspend fun chatStream(messages: List<LlmMessage>): Flow<LlmChunk> {
88+
val body = buildRequestJson(messages, stream = true)
89+
val headers = mapOf(
90+
"Authorization" to "Bearer $apiKey",
91+
"content-type" to "application/json",
92+
)
93+
return flow {
94+
sendChatStream(body, headers).use { stream ->
95+
parseSseStream(stream, this)
96+
}
97+
}.flowOn(Dispatchers.IO)
98+
}
99+
100+
/** Test seam — subclasses override to stub the streaming InputStream. */
101+
internal open fun sendChatStream(body: String, headers: Map<String, String>): InputStream {
102+
val builder = HttpRequest.newBuilder()
103+
.uri(URI.create("$baseUrl/v1/chat/completions"))
104+
.timeout(requestTimeout.toJavaDuration())
105+
.POST(HttpRequest.BodyPublishers.ofString(body))
106+
headers.forEach { (k, v) -> builder.header(k, v) }
107+
val response = http.send(builder.build(), HttpResponse.BodyHandlers.ofInputStream())
108+
return response.body()
109+
}
110+
111+
/** Per-tool-call streaming state. */
112+
private data class ToolCallState(
113+
var id: String? = null,
114+
var name: String? = null,
115+
val argsBuilder: StringBuilder = StringBuilder(),
116+
)
117+
118+
private suspend fun parseSseStream(stream: InputStream, collector: kotlinx.coroutines.flow.FlowCollector<LlmChunk>) {
119+
// Keyed by `tool_calls[].index` within the choice.
120+
val toolStates = mutableMapOf<Int, ToolCallState>()
121+
var usage: TokenUsage? = null
122+
123+
BufferedReader(InputStreamReader(stream, Charsets.UTF_8)).useLines { lines ->
124+
for (line in lines) {
125+
if (line.isBlank() || !line.startsWith("data:")) continue
126+
val payload = line.removePrefix("data:").trim()
127+
if (payload == "[DONE]") {
128+
collector.emit(LlmChunk.End(usage))
129+
return@useLines
130+
}
131+
@Suppress("UNCHECKED_CAST")
132+
val data = LenientJsonParser.parse(payload) as? Map<String, Any?> ?: continue
133+
// Final usage-only delta: choices is empty, usage non-null.
134+
(data["usage"] as? Map<*, *>)?.let { u ->
135+
val prompt = (u["prompt_tokens"] as? Number)?.toInt()
136+
val completion = (u["completion_tokens"] as? Number)?.toInt()
137+
if (prompt != null && completion != null) usage = TokenUsage(prompt, completion)
138+
}
139+
val choices = data["choices"] as? List<*> ?: continue
140+
val choice = choices.firstOrNull() as? Map<*, *> ?: continue
141+
val delta = choice["delta"] as? Map<*, *>
142+
val finishReason = choice["finish_reason"] as? String
143+
144+
// Text content delta.
145+
(delta?.get("content") as? String)?.takeIf { it.isNotEmpty() }?.let {
146+
collector.emit(LlmChunk.TextDelta(it))
147+
}
148+
149+
// Tool-call deltas.
150+
val rawToolCalls = delta?.get("tool_calls") as? List<*>
151+
rawToolCalls?.forEach { tc ->
152+
val tcMap = tc as? Map<*, *> ?: return@forEach
153+
val tcIndex = (tcMap["index"] as? Number)?.toInt() ?: return@forEach
154+
val state = toolStates.getOrPut(tcIndex) { ToolCallState() }
155+
val newId = tcMap["id"] as? String
156+
val fn = tcMap["function"] as? Map<*, *>
157+
val newName = fn?.get("name") as? String
158+
val argsFragment = fn?.get("arguments") as? String
159+
160+
// First sighting: id + name typically present together.
161+
if (state.id == null && newId != null) {
162+
state.id = newId
163+
if (newName != null) state.name = newName
164+
collector.emit(LlmChunk.ToolCallStarted(callId = newId, toolName = newName ?: ""))
165+
} else if (newName != null && state.name == null) {
166+
state.name = newName
167+
}
168+
169+
if (!argsFragment.isNullOrEmpty()) {
170+
state.argsBuilder.append(argsFragment)
171+
val callId = state.id
172+
if (callId != null) {
173+
collector.emit(LlmChunk.ToolCallArgumentsDelta(callId = callId, deltaJson = argsFragment))
174+
}
175+
}
176+
}
177+
178+
// finish_reason == "tool_calls" marks completion of the
179+
// assistant turn's tool-call sequence; emit Finished for
180+
// each accumulated call.
181+
if (finishReason == "tool_calls") {
182+
toolStates.values.forEach { state ->
183+
val callId = state.id ?: return@forEach
184+
val argsString = state.argsBuilder.toString()
185+
val parsed = if (argsString.isBlank()) emptyMap()
186+
else parseToolArguments(argsString).arguments
187+
collector.emit(LlmChunk.ToolCallFinished(callId = callId, arguments = parsed))
188+
}
189+
toolStates.clear()
190+
}
191+
}
192+
// EOF without [DONE]: emit End with whatever usage we captured.
193+
collector.emit(LlmChunk.End(usage))
194+
}
195+
}
196+
62197
/** Test seam — subclasses override to stub HTTP without a server. */
63198
internal open fun sendChat(body: String, headers: Map<String, String>): String {
64199
val builder = HttpRequest.newBuilder()
@@ -78,7 +213,7 @@ open class OpenAiClient(
78213
return String(bytes, Charsets.UTF_8)
79214
}
80215

81-
internal fun buildRequestJson(messages: List<LlmMessage>): String {
216+
internal fun buildRequestJson(messages: List<LlmMessage>, stream: Boolean = false): String {
82217
val pendingToolCallIds: ArrayDeque<String> = ArrayDeque()
83218
var toolCallCounter = 0
84219

@@ -121,7 +256,10 @@ open class OpenAiClient(
121256
""","tools":[$defs]"""
122257
} else ""
123258

124-
return """{"model":${model.toJsonString()},"max_tokens":$maxTokens,"temperature":$temperature,"messages":[${messageObjects.joinToString(",")}]$toolsField}"""
259+
// #1743: stream_options.include_usage opts into a final usage-only
260+
// delta after finish_reason — required to get TokenUsage on stream.
261+
val streamField = if (stream) ""","stream":true,"stream_options":{"include_usage":true}""" else ""
262+
return """{"model":${model.toJsonString()},"max_tokens":$maxTokens,"temperature":$temperature$streamField,"messages":[${messageObjects.joinToString(",")}]$toolsField}"""
125263
}
126264

127265
internal fun parseResponse(body: String): LlmResponse {
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package agents_engine.model
2+
3+
import kotlinx.coroutines.flow.collect
4+
import kotlinx.coroutines.runBlocking
5+
import org.junit.jupiter.api.Assumptions.assumeTrue
6+
import org.junit.jupiter.api.Tag
7+
import org.junit.jupiter.api.Test
8+
import java.io.File
9+
import kotlin.test.assertNotNull
10+
import kotlin.test.assertTrue
11+
12+
/**
13+
* #1743 — live integration test for OpenAiClient.chatStream against the
14+
* real OpenAI API. Requires an API key at `.secrets/openai-key` or in
15+
* `OPENAI_API_KEY`. Tagged `live-llm` — runs via `./gradlew integrationTest`.
16+
*/
17+
class OpenAiClientChatStreamLiveTest {
18+
19+
private val apiKey: String? = loadKey()
20+
private val openAiModel: String = System.getenv("OPENAI_TEST_MODEL") ?: "gpt-4o-mini"
21+
22+
@Tag("live-llm")
23+
@Test
24+
fun `native chatStream against OpenAI emits multiple TextDelta chunks incrementally with token usage`() = runBlocking {
25+
assumeTrue(apiKey != null, "skipping: no OpenAI key at .secrets/openai-key or OPENAI_API_KEY")
26+
27+
val client = OpenAiClient(apiKey = apiKey!!, model = openAiModel, temperature = 0.0)
28+
29+
val startNs = System.nanoTime()
30+
val arrivals = mutableListOf<Pair<Long, LlmChunk>>()
31+
client.chatStream(
32+
listOf(
33+
LlmMessage(
34+
role = "user",
35+
content = "Count from 1 to 10 separated by spaces. Output ONLY the numbers, nothing else.",
36+
),
37+
),
38+
).collect { chunk ->
39+
arrivals += ((System.nanoTime() - startNs) / 1_000_000) to chunk
40+
}
41+
42+
val textDeltas = arrivals.filter { it.second is LlmChunk.TextDelta }
43+
val endChunk = arrivals.last().second as? LlmChunk.End
44+
?: error("last chunk must be End; got: ${arrivals.last().second}")
45+
46+
assertTrue(
47+
textDeltas.size > 1,
48+
"expected multiple TextDelta chunks (proves wire-level SSE streaming); got ${textDeltas.size}",
49+
)
50+
51+
val firstMs = textDeltas.first().first
52+
val lastMs = textDeltas.last().first
53+
val gapMs = lastMs - firstMs
54+
assertTrue(
55+
gapMs >= 20,
56+
"expected at least 20ms between first and last TextDelta; first=${firstMs}ms last=${lastMs}ms gap=${gapMs}ms",
57+
)
58+
59+
assertNotNull(endChunk.tokenUsage, "End chunk must carry TokenUsage (stream_options.include_usage)")
60+
assertTrue(endChunk.tokenUsage!!.completionTokens > 0)
61+
62+
val assembled = textDeltas.joinToString("") { (it.second as LlmChunk.TextDelta).text }
63+
listOf("1", "2", "3").forEach { d ->
64+
assertTrue(d in assembled, "assembled output should contain '$d'; got: \"$assembled\"")
65+
}
66+
67+
println(
68+
"OpenAiClientChatStreamLiveTest: model=$openAiModel chunks=${textDeltas.size} " +
69+
"firstMs=$firstMs lastMs=$lastMs gapMs=$gapMs assembled=\"$assembled\""
70+
)
71+
}
72+
73+
private fun loadKey(): String? {
74+
val envKey = System.getenv("OPENAI_API_KEY")
75+
if (!envKey.isNullOrBlank()) return envKey
76+
val file = File(".secrets/openai-key")
77+
return if (file.exists()) file.readText().trim().ifBlank { null } else null
78+
}
79+
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package agents_engine.model
2+
3+
import kotlinx.coroutines.flow.toList
4+
import kotlinx.coroutines.test.runTest
5+
import kotlin.test.Test
6+
import kotlin.test.assertEquals
7+
import kotlin.test.assertIs
8+
import kotlin.test.assertTrue
9+
10+
// #1743 — non-live unit coverage for OpenAiClient.chatStream SSE parsing.
11+
// OpenAI's SSE is `data:`-only (no `event:` names), terminated by the
12+
// literal `data: [DONE]`. Tool calls correlate across deltas by
13+
// `tool_calls[].index`; `id` arrives in the FIRST delta only.
14+
15+
class OpenAiClientChatStreamTest {
16+
17+
@Test
18+
fun `text-only SSE stream emits TextDelta chunks plus End with usage from final delta`() = runTest {
19+
val sse = buildString {
20+
appendLine("""data: {"id":"x","object":"chat.completion.chunk","choices":[{"index":0,"delta":{"role":"assistant","content":""},"finish_reason":null}]}""")
21+
appendLine()
22+
appendLine("""data: {"id":"x","choices":[{"index":0,"delta":{"content":"Hello"},"finish_reason":null}]}""")
23+
appendLine()
24+
appendLine("""data: {"id":"x","choices":[{"index":0,"delta":{"content":" world"},"finish_reason":null}]}""")
25+
appendLine()
26+
appendLine("""data: {"id":"x","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}""")
27+
appendLine()
28+
appendLine("""data: {"id":"x","choices":[],"usage":{"prompt_tokens":11,"completion_tokens":6,"total_tokens":17}}""")
29+
appendLine()
30+
appendLine("""data: [DONE]""")
31+
appendLine()
32+
}
33+
34+
val chunks = stubbedOpenAi(sse).chatStream(listOf(LlmMessage("user", "Hi"))).toList()
35+
36+
assertEquals(3, chunks.size, "expected 2 TextDelta + End; got: $chunks")
37+
val d1 = chunks[0]; assertIs<LlmChunk.TextDelta>(d1); assertEquals("Hello", d1.text)
38+
val d2 = chunks[1]; assertIs<LlmChunk.TextDelta>(d2); assertEquals(" world", d2.text)
39+
val end = chunks[2]; assertIs<LlmChunk.End>(end)
40+
assertEquals(TokenUsage(promptTokens = 11, completionTokens = 6), end.tokenUsage)
41+
}
42+
43+
@Test
44+
fun `tool-call SSE stream emits Started with call_id, ArgumentsDelta per chunk, Finished with parsed args`() = runTest {
45+
// The id only arrives in the first delta; subsequent deltas
46+
// correlate via tool_calls[].index. Aggregator must remember the
47+
// id across deltas.
48+
val sse = buildString {
49+
appendLine("""data: {"id":"x","choices":[{"index":0,"delta":{"role":"assistant","tool_calls":[{"index":0,"id":"call_abc","type":"function","function":{"name":"get_weather","arguments":""}}]},"finish_reason":null}]}""")
50+
appendLine()
51+
appendLine("""data: {"id":"x","choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"function":{"arguments":"{\"location"}}]},"finish_reason":null}]}""")
52+
appendLine()
53+
appendLine("""data: {"id":"x","choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"function":{"arguments":"\":\"SF\"}"}}]},"finish_reason":null}]}""")
54+
appendLine()
55+
appendLine("""data: {"id":"x","choices":[{"index":0,"delta":{},"finish_reason":"tool_calls"}]}""")
56+
appendLine()
57+
appendLine("""data: {"id":"x","choices":[],"usage":{"prompt_tokens":42,"completion_tokens":18,"total_tokens":60}}""")
58+
appendLine()
59+
appendLine("""data: [DONE]""")
60+
appendLine()
61+
}
62+
63+
val chunks = stubbedOpenAi(sse).chatStream(listOf(LlmMessage("user", "weather"))).toList()
64+
65+
val started = chunks.filterIsInstance<LlmChunk.ToolCallStarted>().single()
66+
assertEquals("call_abc", started.callId, "callId must be OpenAI's call_* id from the first delta")
67+
assertEquals("get_weather", started.toolName)
68+
69+
val deltas = chunks.filterIsInstance<LlmChunk.ToolCallArgumentsDelta>().filter { it.callId == "call_abc" }
70+
// Three argument-bearing deltas: the initial empty arguments string,
71+
// then two non-empty fragments. Aggregator may or may not skip the
72+
// empty one; we accept either shape but assert the non-empty deltas
73+
// appear with the right content.
74+
val deltaJsons = deltas.map { it.deltaJson }
75+
assertTrue("""{"location""" in deltaJsons, "expected first non-empty args fragment; got deltas: $deltaJsons")
76+
assertTrue("""":"SF"}""" in deltaJsons, "expected second non-empty args fragment; got deltas: $deltaJsons")
77+
78+
val finished = chunks.filterIsInstance<LlmChunk.ToolCallFinished>().single()
79+
assertEquals("call_abc", finished.callId)
80+
assertEquals(mapOf("location" to "SF"), finished.arguments)
81+
82+
val end = chunks.filterIsInstance<LlmChunk.End>().single()
83+
assertEquals(TokenUsage(promptTokens = 42, completionTokens = 18), end.tokenUsage)
84+
}
85+
86+
private fun stubbedOpenAi(sse: String): OpenAiClient =
87+
object : OpenAiClient(apiKey = "test-key", model = "test-model") {
88+
override fun sendChatStream(body: String, headers: Map<String, String>): java.io.InputStream =
89+
sse.byteInputStream(Charsets.UTF_8)
90+
}
91+
}

0 commit comments

Comments
 (0)