Skip to content

Commit f2acd68

Browse files
Skobeltsynclaude
andcommitted
test(#1753): cancellation contract pins for all three streaming adapters
Verifies that cancelling the collecting coroutine of chatStream terminates promptly — well before the upstream response finishes. Surprising finding when writing these: the Kotlin Flow contract already handles cancellation correctly. emit() in the producer body suspends waiting for downstream channel capacity; when the collector cancels, emit throws CancellationException, the flow body unwinds, BufferedReader.useLines + .use { stream } close the InputStream cleanly. The IO-thread loop never enters its next blocking readLine after cancel fires. So no ensureActive() additions or other producer-side hardening was needed — the test serves as a regression guard against future changes that might bypass the Flow's cancellation hook (e.g., a synchronous collect-to-list step inside the producer). Three new tests, one per adapter: - OllamaClientCancellationTest: SlowNdjsonStream yielding 6 chunks at 80ms each (~480ms total). Cancel after first chunk; assert return within 250ms. - ClaudeClientCancellationTest: SlowSseStream with 6 content_block triples (~1.5s total). 400ms threshold accommodates SSE having multiple events per displayable text chunk. - OpenAiClientCancellationTest: SlowOpenAiSseStream with 6 deltas + terminator (~640ms total). 250ms threshold. All three green on first run — no impl changes needed. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent c00ef22 commit f2acd68

3 files changed

Lines changed: 264 additions & 0 deletions

File tree

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package agents_engine.model
2+
3+
import kotlinx.coroutines.CompletableDeferred
4+
import kotlinx.coroutines.Dispatchers
5+
import kotlinx.coroutines.cancelAndJoin
6+
import kotlinx.coroutines.flow.collect
7+
import kotlinx.coroutines.launch
8+
import kotlinx.coroutines.runBlocking
9+
import kotlinx.coroutines.withTimeout
10+
import java.io.IOException
11+
import java.io.InputStream
12+
import kotlin.test.Test
13+
import kotlin.test.assertTrue
14+
15+
// #1753 — regression guard for cancellation of ClaudeClient.chatStream.
16+
// Same contract pin as the Ollama analog: Kotlin Flow's channel-backed
17+
// emit propagates collector cancellation back into the producer body,
18+
// closing the InputStream before the next blocking read.
19+
20+
class ClaudeClientCancellationTest {
21+
22+
private class SlowSseStream : InputStream() {
23+
private val events = buildList {
24+
add("event: message_start\ndata: {\"type\":\"message_start\",\"message\":{\"usage\":{\"input_tokens\":3,\"output_tokens\":1}}}\n\n")
25+
(1..6).forEach { i ->
26+
add("event: content_block_start\ndata: {\"type\":\"content_block_start\",\"index\":0,\"content_block\":{\"type\":\"text\"}}\n\n")
27+
add("event: content_block_delta\ndata: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"text_delta\",\"text\":\"chunk$i \"}}\n\n")
28+
add("event: content_block_stop\ndata: {\"type\":\"content_block_stop\",\"index\":0}\n\n")
29+
}
30+
add("event: message_delta\ndata: {\"type\":\"message_delta\",\"usage\":{\"output_tokens\":12}}\n\n")
31+
add("event: message_stop\ndata: {\"type\":\"message_stop\"}\n\n")
32+
}.toMutableList()
33+
private var bytes: ByteArray = events.removeFirst().toByteArray(Charsets.UTF_8)
34+
private var bytesPos: Int = 0
35+
36+
override fun read(): Int = if (bytesPos < bytes.size) bytes[bytesPos++].toInt() and 0xFF else loadNextOrEof()
37+
override fun read(b: ByteArray, off: Int, len: Int): Int {
38+
if (bytesPos >= bytes.size) {
39+
val advanced = loadNextOrEof()
40+
if (advanced < 0) return -1
41+
}
42+
val available = bytes.size - bytesPos
43+
val copyLen = minOf(len, available)
44+
System.arraycopy(bytes, bytesPos, b, off, copyLen)
45+
bytesPos += copyLen
46+
return copyLen
47+
}
48+
private fun loadNextOrEof(): Int {
49+
if (events.isEmpty()) return -1
50+
try { Thread.sleep(80) } catch (_: InterruptedException) { throw IOException("interrupted") }
51+
bytes = events.removeFirst().toByteArray(Charsets.UTF_8)
52+
bytesPos = 0
53+
return bytes[bytesPos].toInt() and 0xFF
54+
}
55+
}
56+
57+
@Test
58+
fun `cancelling Claude chatStream mid-collect terminates within tight window, not the full response`() = runBlocking {
59+
val stubbed = object : ClaudeClient(apiKey = "test-key", model = "test-model") {
60+
override fun sendChatStream(body: String, headers: Map<String, String>): InputStream = SlowSseStream()
61+
}
62+
63+
var received = 0
64+
val firstChunk = CompletableDeferred<Unit>()
65+
66+
val collectJob = launch(Dispatchers.Default) {
67+
stubbed.chatStream(listOf(LlmMessage("user", "Hi"))).collect { _ ->
68+
received++
69+
if (!firstChunk.isCompleted) firstChunk.complete(Unit)
70+
}
71+
}
72+
73+
withTimeout(2000) { firstChunk.await() }
74+
assertTrue(received >= 1, "expected at least one chunk before cancel; got $received")
75+
76+
val cancelStartNs = System.nanoTime()
77+
collectJob.cancelAndJoin()
78+
val cancelMs = (System.nanoTime() - cancelStartNs) / 1_000_000
79+
80+
// SSE has more events per "chunk" (start + delta + stop) so the
81+
// first text delta might arrive 2-3 stub-yields in. Bound at
82+
// 400ms — generous slack vs. the full stream's ~1.5s.
83+
assertTrue(
84+
cancelMs < 400,
85+
"expected cancel to return within ~400ms; took ${cancelMs}ms",
86+
)
87+
}
88+
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package agents_engine.model
2+
3+
import kotlinx.coroutines.CompletableDeferred
4+
import kotlinx.coroutines.Dispatchers
5+
import kotlinx.coroutines.cancelAndJoin
6+
import kotlinx.coroutines.flow.collect
7+
import kotlinx.coroutines.launch
8+
import kotlinx.coroutines.runBlocking
9+
import kotlinx.coroutines.withTimeout
10+
import java.io.IOException
11+
import java.io.InputStream
12+
import kotlin.test.Test
13+
import kotlin.test.assertTrue
14+
15+
// #1753 — regression guard for cancellation of OllamaClient.chatStream.
16+
//
17+
// The Kotlin Flow contract (channel-backed by `flowOn(Dispatchers.IO)`)
18+
// already propagates collector cancellation back through `emit()` —
19+
// when the consuming coroutine cancels, emit throws CancellationException,
20+
// the flow body unwinds, and BufferedReader.useLines + .use { stream }
21+
// close the InputStream cleanly. The IO-thread loop never enters its
22+
// next blocking `readLine` after cancel fires.
23+
//
24+
// This test pins that contract: a slow-yielding stub produces six
25+
// chunks at 80ms each (~480ms total). Cancelling after the first
26+
// chunk arrives must return within ~250ms — proves we're not blocking
27+
// on the remaining chunks. If a future change adds a blocking sync
28+
// step that bypasses the flow's cancellation hook, this test fires.
29+
30+
class OllamaClientCancellationTest {
31+
32+
private class SlowNdjsonStream : InputStream() {
33+
private val chunks = (1..6).map {
34+
"""{"model":"t","message":{"role":"assistant","content":"chunk$it "},"done":false}""" + "\n"
35+
}.toMutableList().also {
36+
it += """{"model":"t","message":{"content":""},"done":true,"prompt_eval_count":3,"eval_count":6}""" + "\n"
37+
}
38+
private var bytes: ByteArray = chunks.removeFirst().toByteArray(Charsets.UTF_8)
39+
private var bytesPos: Int = 0
40+
41+
override fun read(): Int = if (bytesPos < bytes.size) bytes[bytesPos++].toInt() and 0xFF else loadNextOrEof()
42+
override fun read(b: ByteArray, off: Int, len: Int): Int {
43+
if (bytesPos >= bytes.size) {
44+
val advanced = loadNextOrEof()
45+
if (advanced < 0) return -1
46+
}
47+
val available = bytes.size - bytesPos
48+
val copyLen = minOf(len, available)
49+
System.arraycopy(bytes, bytesPos, b, off, copyLen)
50+
bytesPos += copyLen
51+
return copyLen
52+
}
53+
private fun loadNextOrEof(): Int {
54+
if (chunks.isEmpty()) return -1
55+
try { Thread.sleep(80) } catch (_: InterruptedException) { throw IOException("interrupted") }
56+
bytes = chunks.removeFirst().toByteArray(Charsets.UTF_8)
57+
bytesPos = 0
58+
return bytes[bytesPos].toInt() and 0xFF
59+
}
60+
}
61+
62+
@Test
63+
fun `cancelling chatStream mid-collect terminates within one-chunk's worth of time, not the full response`() = runBlocking {
64+
val stubbed = object : OllamaClient(model = "test-model") {
65+
override fun sendChatStream(body: String): InputStream = SlowNdjsonStream()
66+
}
67+
68+
var received = 0
69+
val firstChunk = CompletableDeferred<Unit>()
70+
71+
val collectJob = launch(Dispatchers.Default) {
72+
stubbed.chatStream(listOf(LlmMessage("user", "Hi"))).collect { _ ->
73+
received++
74+
if (!firstChunk.isCompleted) firstChunk.complete(Unit)
75+
}
76+
}
77+
78+
withTimeout(2000) { firstChunk.await() }
79+
assertTrue(received >= 1, "expected at least one chunk before cancel; got $received")
80+
81+
val cancelStartNs = System.nanoTime()
82+
collectJob.cancelAndJoin()
83+
val cancelMs = (System.nanoTime() - cancelStartNs) / 1_000_000
84+
85+
// Bound: under 250ms covers one-chunk-worth (~80ms) + cleanup
86+
// overhead. The full stream would take ~480ms — we must not wait
87+
// for it.
88+
assertTrue(
89+
cancelMs < 250,
90+
"expected cancel to return within ~250ms (one chunk's worth + slack); took ${cancelMs}ms",
91+
)
92+
}
93+
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package agents_engine.model
2+
3+
import kotlinx.coroutines.CompletableDeferred
4+
import kotlinx.coroutines.Dispatchers
5+
import kotlinx.coroutines.cancelAndJoin
6+
import kotlinx.coroutines.flow.collect
7+
import kotlinx.coroutines.launch
8+
import kotlinx.coroutines.runBlocking
9+
import kotlinx.coroutines.withTimeout
10+
import java.io.IOException
11+
import java.io.InputStream
12+
import kotlin.test.Test
13+
import kotlin.test.assertTrue
14+
15+
// #1753 — regression guard for cancellation of OpenAiClient.chatStream.
16+
// Same contract pin: Kotlin Flow's emit propagates collector cancellation
17+
// back into the producer body, closing the InputStream before the next
18+
// blocking read.
19+
20+
class OpenAiClientCancellationTest {
21+
22+
private class SlowOpenAiSseStream : InputStream() {
23+
private val events = buildList {
24+
(1..6).forEach { i ->
25+
add("data: {\"id\":\"x\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"chunk$i \"},\"finish_reason\":null}]}\n\n")
26+
}
27+
add("data: {\"id\":\"x\",\"choices\":[{\"index\":0,\"delta\":{},\"finish_reason\":\"stop\"}]}\n\n")
28+
add("data: {\"id\":\"x\",\"choices\":[],\"usage\":{\"prompt_tokens\":3,\"completion_tokens\":12,\"total_tokens\":15}}\n\n")
29+
add("data: [DONE]\n\n")
30+
}.toMutableList()
31+
private var bytes: ByteArray = events.removeFirst().toByteArray(Charsets.UTF_8)
32+
private var bytesPos: Int = 0
33+
34+
override fun read(): Int = if (bytesPos < bytes.size) bytes[bytesPos++].toInt() and 0xFF else loadNextOrEof()
35+
override fun read(b: ByteArray, off: Int, len: Int): Int {
36+
if (bytesPos >= bytes.size) {
37+
val advanced = loadNextOrEof()
38+
if (advanced < 0) return -1
39+
}
40+
val available = bytes.size - bytesPos
41+
val copyLen = minOf(len, available)
42+
System.arraycopy(bytes, bytesPos, b, off, copyLen)
43+
bytesPos += copyLen
44+
return copyLen
45+
}
46+
private fun loadNextOrEof(): Int {
47+
if (events.isEmpty()) return -1
48+
try { Thread.sleep(80) } catch (_: InterruptedException) { throw IOException("interrupted") }
49+
bytes = events.removeFirst().toByteArray(Charsets.UTF_8)
50+
bytesPos = 0
51+
return bytes[bytesPos].toInt() and 0xFF
52+
}
53+
}
54+
55+
@Test
56+
fun `cancelling OpenAI chatStream mid-collect terminates within tight window, not the full response`() = runBlocking {
57+
val stubbed = object : OpenAiClient(apiKey = "test-key", model = "test-model") {
58+
override fun sendChatStream(body: String, headers: Map<String, String>): InputStream = SlowOpenAiSseStream()
59+
}
60+
61+
var received = 0
62+
val firstChunk = CompletableDeferred<Unit>()
63+
64+
val collectJob = launch(Dispatchers.Default) {
65+
stubbed.chatStream(listOf(LlmMessage("user", "Hi"))).collect { _ ->
66+
received++
67+
if (!firstChunk.isCompleted) firstChunk.complete(Unit)
68+
}
69+
}
70+
71+
withTimeout(2000) { firstChunk.await() }
72+
assertTrue(received >= 1, "expected at least one chunk before cancel; got $received")
73+
74+
val cancelStartNs = System.nanoTime()
75+
collectJob.cancelAndJoin()
76+
val cancelMs = (System.nanoTime() - cancelStartNs) / 1_000_000
77+
78+
assertTrue(
79+
cancelMs < 250,
80+
"expected cancel to return within ~250ms; took ${cancelMs}ms",
81+
)
82+
}
83+
}

0 commit comments

Comments
 (0)