Skip to content

Commit 0a503d0

Browse files
keyzouclaude
andauthored
fix(provider,claude): handle prompt stream interrupt on session stop (#1365)
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent d4a7e1a commit 0a503d0

2 files changed

Lines changed: 70 additions & 0 deletions

File tree

apps/server/src/provider/Layers/ClaudeAdapter.test.ts

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1153,6 +1153,73 @@ describe("ClaudeAdapterLive", () => {
11531153
);
11541154
});
11551155

1156+
it.effect("stopSession does not throw into the SDK prompt consumer", () => {
1157+
// The SDK consumes user messages via `for await (... of prompt)`.
1158+
// Stopping a session must end that loop cleanly — not throw an error.
1159+
//
1160+
// FakeClaudeQuery.close() masks this by resolving pending iterators
1161+
// before the shutdown propagates. Override it to match real SDK behavior
1162+
// where close() does not resolve the prompt consumer.
1163+
const query = new FakeClaudeQuery();
1164+
(query as { close: () => void }).close = () => {
1165+
query.closeCalls += 1;
1166+
};
1167+
1168+
let promptConsumerError: unknown = undefined;
1169+
1170+
const layer = makeClaudeAdapterLive({
1171+
createQuery: (input) => {
1172+
// Simulate the SDK consuming the prompt iterable
1173+
(async () => {
1174+
try {
1175+
for await (const _message of input.prompt) {
1176+
/* SDK processes user messages */
1177+
}
1178+
} catch (error) {
1179+
promptConsumerError = error;
1180+
}
1181+
})();
1182+
return query;
1183+
},
1184+
}).pipe(
1185+
Layer.provideMerge(ServerConfig.layerTest("/tmp/claude-adapter-test", "/tmp")),
1186+
Layer.provideMerge(NodeServices.layer),
1187+
);
1188+
1189+
return Effect.gen(function* () {
1190+
const adapter = yield* ClaudeAdapter;
1191+
1192+
const runtimeEventsFiber = Effect.runFork(
1193+
Stream.runForEach(adapter.streamEvents, () => Effect.void),
1194+
);
1195+
1196+
yield* adapter.startSession({
1197+
threadId: THREAD_ID,
1198+
provider: "claudeAgent",
1199+
runtimeMode: "full-access",
1200+
});
1201+
1202+
yield* adapter.stopSession(THREAD_ID);
1203+
1204+
yield* Effect.yieldNow;
1205+
yield* Effect.yieldNow;
1206+
yield* Effect.yieldNow;
1207+
yield* Effect.promise(() => new Promise((resolve) => setTimeout(resolve, 50)));
1208+
1209+
runtimeEventsFiber.interruptUnsafe();
1210+
1211+
assert.equal(
1212+
promptConsumerError,
1213+
undefined,
1214+
`Prompt consumer should not receive a thrown error on session stop, ` +
1215+
`but got: "${promptConsumerError instanceof Error ? promptConsumerError.message : String(promptConsumerError)}"`,
1216+
);
1217+
}).pipe(
1218+
Effect.provideService(Random.Random, makeDeterministicRandomService()),
1219+
Effect.provide(layer),
1220+
);
1221+
});
1222+
11561223
it.effect("forwards Claude task progress summaries for subagent updates", () => {
11571224
const harness = makeHarness();
11581225
return Effect.gen(function* () {

apps/server/src/provider/Layers/ClaudeAdapter.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2258,6 +2258,9 @@ function makeClaudeAdapter(options?: ClaudeAdapterLiveOptions) {
22582258
const prompt = Stream.fromQueue(promptQueue).pipe(
22592259
Stream.filter((item) => item.type === "message"),
22602260
Stream.map((item) => item.message),
2261+
Stream.catchCause((cause) =>
2262+
Cause.hasInterruptsOnly(cause) ? Stream.empty : Stream.failCause(cause),
2263+
),
22612264
Stream.toAsyncIterable,
22622265
);
22632266

0 commit comments

Comments
 (0)