Skip to content

Commit e8b7437

Browse files
Skobeltsynclaude
andcommitted
test(#1738): step-2 closeout — cancellation test + honest docs update
AgentSessionCancellationTest.kt — proves that cancelling a session's events collector terminates promptly (under 500ms) even while an implementedBy skill is still in a 2-second Thread.sleep. Step-2 honesty noted in test docstring: the invocation itself can't be cancelled mid-body because implementedBy lambdas have no suspension points. Step 3's executeAgentic rewire closes that gap by making the HTTP path a chatStream suspend function — cancellation will then propagate through the suspension. README Limitations — gains a session-surface entry that documents what ships (bracket events) vs what's deferred to step 3 (Token / ToolCall* emission, agentic-loop rewire) vs the partial cancellation contract. The chatStream entry stays; this is a sibling. docs/roadmap.md Phase 2 Secondary — adds [x] for the streaming session surface (#1736) with #1737/#1738 integration coverage called out, plus a new [ ] line for the agentic-loop rewire so the v0.5.0 step-3 work has a roadmap home. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 89d892e commit e8b7437

3 files changed

Lines changed: 80 additions & 0 deletions

File tree

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ What the framework does **not** enforce — your responsibility:
160160
- **No incoming auth on `McpServer`** — outgoing client supports Bearer; the server does not validate credentials. Suitable for trusted-network deployments only.
161161
- **No Origin header validation on MCP HTTP** — deferred until the MCP-server hardening pass.
162162
- **No per-adapter native streaming yet**`LlmChunk` sealed type + `ModelClient.chatStream(messages): Flow<LlmChunk>` default impl landed in v0.4.6 (#1722), so `chatStream` is callable on every `ModelClient`. The default wraps `chat()` and emits one `TextDelta` + `End` (or `ToolCallStarted` / `ArgumentsDelta` / `Finished` / `End` for tool turns), so non-streaming consumers see ordered chunks but no real-time partial output. Native streaming overrides (Anthropic SSE, OpenAI SSE, Ollama `stream: true`) are next on the Phase 2 list — see [docs/premortem-0.5.0-streaming.md](docs/premortem-0.5.0-streaming.md).
163+
- **Streaming session surface ships now; token-level streaming inside the agentic loop is in progress.** `agent.session(input): AgentSession<OUT>` exposes `events: Flow<AgentEvent<OUT>>` with `SkillStarted` / `SkillCompleted` / `Completed<OUT>` / `Failed` bracket events (#1736). The `Token` / `ToolCallStarted` / `ToolCallArgumentsDelta` / `ToolCallFinished` event types are defined but not yet emitted — `executeAgentic` still calls `chat()` synchronously. Step 3 rewires the agentic loop onto a `FlowCollector<AgentEvent>` so mid-loop events fire as the LLM streams. Cancellation is partial today (Flow collection cancels promptly; `implementedBy` skill bodies have no suspend points and may run to completion in the background — step 3 closes that gap via the native streaming HTTP path).
163164
- **No native binary** — JVM-only (≥ JDK 21). GraalVM and `jlink` bundles are Phase 2 priorities.
164165
- **No A2A protocol yet** — agent-to-agent over network (Phase 2 / 3).
165166
- **Inline-tool-call fallback model variance** — small Ollama models (e.g. `gemma3:4b`) reliably emit single tool calls via the inline format but may produce thin final-turn text after multi-step tool sequences. For multi-step reasoning, a tool-native model (`gpt-oss:20b-cloud` and similar) is the better fit.

docs/roadmap.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@
4949
- [x] Agent memory — `MemoryBank`, `memory_read`/`memory_write`/`memory_search` auto-injected tools
5050
- [ ] `.spawn {}` — independent sub-agent lifecycle, `AgentHandle<OUT>`, parent-managed join
5151
- [x] Streaming foundation — `LlmChunk` sealed type (`TextDelta` / `ToolCallStarted` / `ToolCallArgumentsDelta` / `ToolCallFinished` / `End`) + `ModelClient.chatStream(messages): Flow<LlmChunk>` with a default impl that wraps `chat()` so non-streaming providers keep working unchanged. Provider-native streaming (Anthropic SSE, OpenAI SSE, Ollama `stream: true`) overrides land per-adapter. `LlmChunk` stays narrow — no agentic concepts like `skillName` / `agentId` (#1722)
52+
- [x] Streaming session surface — `AgentEvent` sealed hierarchy (`Token` / `ToolCallStarted` / `ToolCallArgumentsDelta` / `ToolCallFinished` / `SkillStarted` / `SkillCompleted` / `Completed<OUT>` / `Failed`, every event carrying `agentId`), `AgentSession<OUT>` (cold `events: Flow<AgentEvent<OUT>>` + `suspend fun await(): OUT`), and free function `Agent<IN, OUT>.session(input): AgentSession<OUT>` (#1736). Existing `Agent.invokeSuspend` delegates to a new internal `invokeSuspendForSession` with a no-op skill listener — backward-compat byte-for-byte. Today emits only bracket events (`SkillStarted` / `SkillCompleted` / `Completed` / `Failed`) — the `Token` / `ToolCall*` subtypes are defined and ready for consumers but not yet emitted (next entry). Integration coverage: failure-path identity-preserved `cause`, concurrent sessions, agentic-stub bracketing, live-LLM π-to-20-decimals against Ollama (#1737), and prompt-cancellation of the events collector (#1738).
53+
- [ ] Agentic-loop rewire onto `FlowCollector<AgentEvent>``Token` and `ToolCall*` events fire mid-loop; cancellation propagates into `chatStream` HTTP suspension; `tokensUsed` gets threaded through `SkillCompleted` / `Completed`. Step 3 of the v0.5.0 plan.
5254
- [ ] Per-adapter native streaming overrides — Anthropic SSE, OpenAI SSE, Ollama `stream: true` — emit real partial chunks instead of the default `chat()`-wrap. See [v0.5.0 streaming premortem](premortem-0.5.0-streaming.md)
5355
- [ ] `Flow<PipelineEvent>` for reactive UIs + Pipeline-level events (`StageStarted`, `PipelineCompleted`, etc) — built on top of `LlmChunk`; depends on sub-agents and sessions
5456
- [ ] **Multimodal input** — vision and audio content blocks on LLM messages.
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package agents_engine.runtime.events
2+
3+
import agents_engine.core.agent
4+
import kotlinx.coroutines.CompletableDeferred
5+
import kotlinx.coroutines.cancelAndJoin
6+
import kotlinx.coroutines.flow.collect
7+
import kotlinx.coroutines.launch
8+
import kotlinx.coroutines.runBlocking
9+
import kotlinx.coroutines.withTimeout
10+
import kotlin.test.Test
11+
import kotlin.test.assertTrue
12+
13+
// #1738 — step-2 closeout: prove that cancelling a session's events
14+
// collector terminates the collect promptly, even when the underlying
15+
// `implementedBy` skill is mid-execution.
16+
//
17+
// **Step-2 honesty note.** Coroutine cancellation can only fire at
18+
// suspension points. An `implementedBy` lambda is `(IN) -> OUT` — pure
19+
// synchronous code with no suspension points — so the *invocation*
20+
// itself may run to completion in the background after a cancel. What
21+
// we CAN verify in step 2:
22+
// - The collector job's `cancelAndJoin()` returns promptly (well under
23+
// the skill's synthetic sleep duration).
24+
// - Subsequent `collect`s on the cancelled session don't deliver
25+
// additional events to the cancelled job.
26+
//
27+
// What step 3 will add (and what this test will be extended to cover):
28+
// once `executeAgentic` is rewired onto a `FlowCollector<AgentEvent>`,
29+
// the HTTP call inside the loop becomes a `chatStream(...)` suspend
30+
// function — cancellation propagates through that suspend boundary and
31+
// the actual invocation stops. Today, only the *Flow surface* respects
32+
// cancellation; the agentic loop's body doesn't yet.
33+
34+
class AgentSessionCancellationTest {
35+
36+
@Test
37+
fun `cancelling the events collect terminates promptly even while a slow skill is mid-execution`() = runBlocking {
38+
val skillEntered = CompletableDeferred<Unit>()
39+
40+
// 2-second synthetic delay — large enough that "cancel returns
41+
// before the skill finishes" is unambiguously measurable.
42+
val slowAgent = agent<String, String>("slow") {
43+
skills {
44+
skill<String, String>("work", "Synthetic slow work to exercise cancellation") {
45+
implementedBy {
46+
skillEntered.complete(Unit)
47+
Thread.sleep(2000)
48+
"done"
49+
}
50+
}
51+
}
52+
}
53+
54+
val session = slowAgent.session("input")
55+
val collectJob = launch {
56+
session.events.collect { /* receive but don't act */ }
57+
}
58+
59+
// Wait for the skill to enter — this proves the invocation
60+
// actually started before we cancel.
61+
withTimeout(1000) { skillEntered.await() }
62+
63+
val cancelStartNs = System.nanoTime()
64+
collectJob.cancelAndJoin()
65+
val cancelMs = (System.nanoTime() - cancelStartNs) / 1_000_000
66+
67+
// The skill's Thread.sleep continues to run in the background
68+
// (step-2 gap, documented above). What we assert: the cancel
69+
// returned promptly — under half a second is generous slack
70+
// for CI noise; the skill's sleep is 2 seconds. If cancel were
71+
// waiting on the skill, this would never hold.
72+
assertTrue(
73+
cancelMs < 500,
74+
"cancel should return well under the skill's 2-second sleep; took ${cancelMs}ms",
75+
)
76+
}
77+
}

0 commit comments

Comments
 (0)