Skip to content

Commit 851024d

Browse files
Preserve Claude stream interruption errors
- Split Claude stream failures from interrupts - Keep interrupted turns from being reported as generic failures
1 parent 4af9463 commit 851024d

1 file changed

Lines changed: 48 additions & 50 deletions

File tree

apps/server/src/provider/Layers/ClaudeAdapter.ts

Lines changed: 48 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ import {
5555
FileSystem,
5656
Fiber,
5757
Layer,
58+
Option,
5859
Queue,
5960
Random,
6061
Ref,
@@ -146,7 +147,7 @@ interface ClaudeSessionContext {
146147
session: ProviderSession;
147148
readonly promptQueue: Queue.Queue<PromptQueueItem>;
148149
readonly query: ClaudeQueryRuntime;
149-
streamFiber: Fiber.Fiber<void, Error> | undefined;
150+
streamFiber: Fiber.Fiber<void, ClaudeStreamError> | undefined;
150151
readonly startedAt: string;
151152
readonly basePermissionMode: PermissionMode | undefined;
152153
currentApiModelId: string | undefined;
@@ -183,10 +184,22 @@ export interface ClaudeAdapterLiveOptions {
183184
readonly nativeEventLogger?: EventNdjsonLogger;
184185
}
185186

186-
class ClaudeStreamError extends Schema.TaggedErrorClass<ClaudeStreamError>()("ClaudeStreamError", {
187-
message: Schema.String,
188-
cause: Schema.optional(Schema.Defect),
189-
}) {}
187+
class ClaudeStreamInterruptedError extends Schema.TaggedErrorClass<ClaudeStreamInterruptedError>()(
188+
"ClaudeStreamInterruptedError",
189+
{
190+
message: Schema.String,
191+
cause: Schema.optional(Schema.Defect),
192+
},
193+
) {}
194+
class ClaudeStreamFailedError extends Schema.TaggedErrorClass<ClaudeStreamFailedError>()(
195+
"ClaudeStreamFailedError",
196+
{
197+
message: Schema.String,
198+
cause: Schema.optional(Schema.Defect),
199+
},
200+
) {}
201+
const isClaudeStreamInterruptedError = Schema.is(ClaudeStreamInterruptedError);
202+
type ClaudeStreamError = ClaudeStreamInterruptedError | ClaudeStreamFailedError;
190203

191204
function isUuid(value: string): boolean {
192205
return /^[0-9a-f]{8}-[0-9a-f]{4}-[1-8][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i.test(value);
@@ -196,17 +209,6 @@ function isSyntheticClaudeThreadId(value: string): boolean {
196209
return value.startsWith("claude-thread-");
197210
}
198211

199-
function normalizeClaudeStreamMessages(cause: Cause.Cause<Error>): ReadonlyArray<string> {
200-
const errors = Cause.prettyErrors(cause)
201-
.map((error) => error.message.trim())
202-
.filter((message) => message.length > 0);
203-
if (errors.length > 0) {
204-
return errors;
205-
}
206-
207-
return [Cause.pretty(cause)];
208-
}
209-
210212
function getEffectiveClaudeCodeEffort(
211213
effort: ClaudeCodeEffort | null | undefined,
212214
): Exclude<ClaudeCodeEffort, "ultrathink"> | null {
@@ -225,22 +227,6 @@ function isClaudeInterruptedMessage(message: string): boolean {
225227
);
226228
}
227229

228-
function isClaudeInterruptedCause(cause: Cause.Cause<Error>): boolean {
229-
return (
230-
Cause.hasInterruptsOnly(cause) ||
231-
normalizeClaudeStreamMessages(cause).some(isClaudeInterruptedMessage)
232-
);
233-
}
234-
235-
function messageFromClaudeStreamCause(cause: Cause.Cause<Error>, fallback: string): string {
236-
return normalizeClaudeStreamMessages(cause)[0] ?? fallback;
237-
}
238-
239-
function interruptionMessageFromClaudeCause(cause: Cause.Cause<Error>): string {
240-
const message = messageFromClaudeStreamCause(cause, "Claude runtime interrupted.");
241-
return isClaudeInterruptedMessage(message) ? "Claude runtime interrupted." : message;
242-
}
243-
244230
function resultErrorsText(result: SDKResultMessage): string {
245231
return "errors" in result && Array.isArray(result.errors)
246232
? result.errors.join(" ").toLowerCase()
@@ -2221,39 +2207,51 @@ const makeClaudeAdapter = Effect.fn("makeClaudeAdapter")(function* (
22212207
});
22222208

22232209
const runSdkStream = (context: ClaudeSessionContext) =>
2224-
Stream.fromAsyncIterable(
2225-
context.query,
2226-
(cause) =>
2227-
new ClaudeStreamError({
2228-
message: "An error occurred while running the Claude SDK stream.",
2229-
cause,
2230-
}),
2231-
).pipe(
2210+
Stream.fromAsyncIterable(context.query, (cause) => {
2211+
const message =
2212+
cause instanceof Error && cause.message.trim().length > 0
2213+
? cause.message
2214+
: "Claude runtime stream failed.";
2215+
return isClaudeInterruptedMessage(message)
2216+
? new ClaudeStreamInterruptedError({ message, cause })
2217+
: new ClaudeStreamFailedError({ message, cause });
2218+
}).pipe(
22322219
Stream.takeWhile(() => !context.stopped),
22332220
Stream.runForEach((message) => handleSdkMessage(context, message)),
22342221
);
22352222

22362223
const handleStreamExit = Effect.fn("handleStreamExit")(function* (
22372224
context: ClaudeSessionContext,
2238-
exit: Exit.Exit<void, Error>,
2225+
exit: Exit.Exit<void, ClaudeStreamError>,
22392226
) {
22402227
if (context.stopped) {
22412228
return;
22422229
}
22432230

22442231
if (Exit.isFailure(exit)) {
2245-
if (isClaudeInterruptedCause(exit.cause)) {
2232+
if (Cause.hasInterruptsOnly(exit.cause)) {
22462233
if (context.turnState) {
2247-
yield* completeTurn(
2248-
context,
2249-
"interrupted",
2250-
interruptionMessageFromClaudeCause(exit.cause),
2251-
);
2234+
yield* completeTurn(context, "interrupted", "Claude runtime interrupted.");
22522235
}
22532236
} else {
2254-
const message = messageFromClaudeStreamCause(exit.cause, "Claude runtime stream failed.");
2255-
yield* emitRuntimeError(context, message, Cause.pretty(exit.cause));
2256-
yield* completeTurn(context, "failed", message);
2237+
yield* Option.match(Cause.findErrorOption(exit.cause), {
2238+
onNone: () =>
2239+
context.turnState
2240+
? completeTurn(context, "failed", "Claude runtime stream failed.")
2241+
: Effect.void,
2242+
onSome: (streamError) =>
2243+
isClaudeStreamInterruptedError(streamError)
2244+
? context.turnState
2245+
? completeTurn(context, "interrupted", "Claude runtime interrupted.")
2246+
: Effect.void
2247+
: Effect.gen(function* () {
2248+
const message = streamError.message;
2249+
yield* emitRuntimeError(context, message, Cause.pretty(exit.cause));
2250+
if (context.turnState) {
2251+
yield* completeTurn(context, "failed", message);
2252+
}
2253+
}),
2254+
});
22572255
}
22582256
} else if (context.turnState) {
22592257
yield* completeTurn(context, "interrupted", "Claude runtime stream ended.");

0 commit comments

Comments
 (0)