Skip to content

Commit 384c33e

Browse files
more interupt fixes
1 parent e016fe7 commit 384c33e

2 files changed

Lines changed: 384 additions & 10 deletions

File tree

apps/server/src/provider/Layers/ClaudeAdapter.test.ts

Lines changed: 317 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,8 @@ async function readFirstPromptMessage(
239239

240240
const THREAD_ID = ThreadId.makeUnsafe("thread-claude-1");
241241
const RESUME_THREAD_ID = ThreadId.makeUnsafe("thread-claude-resume");
242+
const INTERRUPTED_TOOL_RESULT_TEXT =
243+
"The user doesn't want to proceed with this tool use. The tool use was rejected (eg. if it was a file edit, the new_string was NOT written to the file). STOP what you are doing and wait for the user to tell you how to proceed.";
242244

243245
describe("ClaudeAdapterLive", () => {
244246
it.effect("returns validation error for non-claude provider on startSession", () => {
@@ -1101,6 +1103,182 @@ describe("ClaudeAdapterLive", () => {
11011103
);
11021104
});
11031105

1106+
it.effect("does not surface ede_diagnostic-only Claude results as runtime errors", () => {
1107+
const harness = makeHarness();
1108+
return Effect.gen(function* () {
1109+
const adapter = yield* ClaudeAdapter;
1110+
1111+
const runtimeEventsFiber = yield* Stream.take(adapter.streamEvents, 6).pipe(
1112+
Stream.runCollect,
1113+
Effect.forkChild,
1114+
);
1115+
1116+
const session = yield* adapter.startSession({
1117+
threadId: THREAD_ID,
1118+
provider: "claudeAgent",
1119+
runtimeMode: "full-access",
1120+
});
1121+
1122+
const turn = yield* adapter.sendTurn({
1123+
threadId: session.threadId,
1124+
input: "hello",
1125+
attachments: [],
1126+
});
1127+
1128+
harness.query.emit({
1129+
type: "result",
1130+
subtype: "error_during_execution",
1131+
is_error: false,
1132+
errors: ["[ede_diagnostic] result_type=user last_content_type=n/a stop_reason=tool_use"],
1133+
stop_reason: "tool_use",
1134+
session_id: "sdk-session-ede-diagnostic",
1135+
uuid: "result-ede-diagnostic",
1136+
} as unknown as SDKMessage);
1137+
1138+
const runtimeEvents = Array.from(yield* Fiber.join(runtimeEventsFiber));
1139+
assert.deepEqual(
1140+
runtimeEvents.map((event) => event.type),
1141+
[
1142+
"session.started",
1143+
"session.configured",
1144+
"session.state.changed",
1145+
"turn.started",
1146+
"thread.started",
1147+
"turn.completed",
1148+
],
1149+
);
1150+
1151+
const turnCompleted = runtimeEvents[runtimeEvents.length - 1];
1152+
assert.equal(turnCompleted?.type, "turn.completed");
1153+
if (turnCompleted?.type === "turn.completed") {
1154+
assert.equal(String(turnCompleted.turnId), String(turn.turnId));
1155+
assert.equal(turnCompleted.payload.state, "completed");
1156+
assert.isUndefined(turnCompleted.payload.errorMessage);
1157+
assert.equal(turnCompleted.payload.stopReason, "tool_use");
1158+
}
1159+
}).pipe(
1160+
Effect.provideService(Random.Random, makeDeterministicRandomService()),
1161+
Effect.provide(harness.layer),
1162+
);
1163+
});
1164+
1165+
it.effect(
1166+
"marks rejected tool results after interruptTurn as declined and completes interrupted",
1167+
() => {
1168+
const harness = makeHarness();
1169+
return Effect.gen(function* () {
1170+
const adapter = yield* ClaudeAdapter;
1171+
1172+
const runtimeEventsFiber = yield* Stream.take(adapter.streamEvents, 10).pipe(
1173+
Stream.runCollect,
1174+
Effect.forkChild,
1175+
);
1176+
1177+
const session = yield* adapter.startSession({
1178+
threadId: THREAD_ID,
1179+
provider: "claudeAgent",
1180+
runtimeMode: "full-access",
1181+
});
1182+
1183+
const turn = yield* adapter.sendTurn({
1184+
threadId: session.threadId,
1185+
input: "hello",
1186+
attachments: [],
1187+
});
1188+
1189+
harness.query.emit({
1190+
type: "stream_event",
1191+
session_id: "sdk-session-interrupted-tool-result",
1192+
uuid: "stream-tool-start-interrupted",
1193+
parent_tool_use_id: null,
1194+
event: {
1195+
type: "content_block_start",
1196+
index: 1,
1197+
content_block: {
1198+
type: "tool_use",
1199+
id: "tool-bash-1",
1200+
name: "Bash",
1201+
input: {
1202+
command: "ls",
1203+
},
1204+
},
1205+
},
1206+
} as unknown as SDKMessage);
1207+
1208+
yield* adapter.interruptTurn(session.threadId, turn.turnId);
1209+
1210+
harness.query.emit({
1211+
type: "user",
1212+
session_id: "sdk-session-interrupted-tool-result",
1213+
uuid: "user-tool-result-interrupted",
1214+
parent_tool_use_id: null,
1215+
message: {
1216+
role: "user",
1217+
content: [
1218+
{
1219+
type: "tool_result",
1220+
tool_use_id: "tool-bash-1",
1221+
content: INTERRUPTED_TOOL_RESULT_TEXT,
1222+
is_error: true,
1223+
},
1224+
],
1225+
},
1226+
} as unknown as SDKMessage);
1227+
1228+
harness.query.emit({
1229+
type: "result",
1230+
subtype: "error_during_execution",
1231+
is_error: true,
1232+
errors: [INTERRUPTED_TOOL_RESULT_TEXT],
1233+
stop_reason: "tool_use",
1234+
session_id: "sdk-session-interrupted-tool-result",
1235+
uuid: "result-interrupted-tool-result",
1236+
} as unknown as SDKMessage);
1237+
1238+
const runtimeEvents = Array.from(yield* Fiber.join(runtimeEventsFiber));
1239+
assert.deepEqual(
1240+
runtimeEvents.map((event) => event.type),
1241+
[
1242+
"session.started",
1243+
"session.configured",
1244+
"session.state.changed",
1245+
"turn.started",
1246+
"thread.started",
1247+
"item.started",
1248+
"item.updated",
1249+
"content.delta",
1250+
"item.completed",
1251+
"turn.completed",
1252+
],
1253+
);
1254+
1255+
const toolUpdated = runtimeEvents[6];
1256+
assert.equal(toolUpdated?.type, "item.updated");
1257+
if (toolUpdated?.type === "item.updated") {
1258+
assert.equal(toolUpdated.payload.status, "declined");
1259+
}
1260+
1261+
const toolCompleted = runtimeEvents[8];
1262+
assert.equal(toolCompleted?.type, "item.completed");
1263+
if (toolCompleted?.type === "item.completed") {
1264+
assert.equal(toolCompleted.payload.status, "declined");
1265+
}
1266+
1267+
const turnCompleted = runtimeEvents[9];
1268+
assert.equal(turnCompleted?.type, "turn.completed");
1269+
if (turnCompleted?.type === "turn.completed") {
1270+
assert.equal(String(turnCompleted.turnId), String(turn.turnId));
1271+
assert.equal(turnCompleted.payload.state, "interrupted");
1272+
assert.equal(turnCompleted.payload.errorMessage, "Claude runtime interrupted.");
1273+
assert.equal(turnCompleted.payload.stopReason, "tool_use");
1274+
}
1275+
}).pipe(
1276+
Effect.provideService(Random.Random, makeDeterministicRandomService()),
1277+
Effect.provide(harness.layer),
1278+
);
1279+
},
1280+
);
1281+
11041282
it.effect("closes the session when the Claude stream aborts after a turn starts", () => {
11051283
const harness = makeHarness();
11061284
return Effect.gen(function* () {
@@ -1169,6 +1347,145 @@ describe("ClaudeAdapterLive", () => {
11691347
);
11701348
});
11711349

1350+
it.effect(
1351+
"treats Claude ede_diagnostic tool_use cancellation as interrupted without a runtime error",
1352+
() => {
1353+
const harness = makeHarness();
1354+
return Effect.gen(function* () {
1355+
const services = yield* Effect.services();
1356+
const runFork = Effect.runForkWith(services);
1357+
1358+
const adapter = yield* ClaudeAdapter;
1359+
const runtimeEvents: Array<ProviderRuntimeEvent> = [];
1360+
1361+
const runtimeEventsFiber = runFork(
1362+
Stream.runForEach(adapter.streamEvents, (event) =>
1363+
Effect.sync(() => {
1364+
runtimeEvents.push(event);
1365+
}),
1366+
),
1367+
);
1368+
1369+
yield* adapter.startSession({
1370+
threadId: THREAD_ID,
1371+
provider: "claudeAgent",
1372+
runtimeMode: "full-access",
1373+
});
1374+
1375+
const turn = yield* adapter.sendTurn({
1376+
threadId: THREAD_ID,
1377+
input: "hello",
1378+
attachments: [],
1379+
});
1380+
1381+
harness.query.fail(
1382+
new Error("[ede_diagnostic] result_type=user last_content_type=n/a stop_reason=tool_use"),
1383+
);
1384+
1385+
yield* Effect.yieldNow;
1386+
yield* Effect.yieldNow;
1387+
yield* Effect.yieldNow;
1388+
runtimeEventsFiber.interruptUnsafe();
1389+
1390+
assert.deepEqual(
1391+
runtimeEvents.map((event) => event.type),
1392+
[
1393+
"session.started",
1394+
"session.configured",
1395+
"session.state.changed",
1396+
"turn.started",
1397+
"turn.completed",
1398+
"session.exited",
1399+
],
1400+
);
1401+
1402+
const turnCompleted = runtimeEvents[4];
1403+
assert.equal(turnCompleted?.type, "turn.completed");
1404+
if (turnCompleted?.type === "turn.completed") {
1405+
assert.equal(String(turnCompleted.turnId), String(turn.turnId));
1406+
assert.equal(turnCompleted.payload.state, "interrupted");
1407+
assert.equal(turnCompleted.payload.errorMessage, "Claude runtime interrupted.");
1408+
}
1409+
1410+
const sessionExited = runtimeEvents[5];
1411+
assert.equal(sessionExited?.type, "session.exited");
1412+
}).pipe(
1413+
Effect.provideService(Random.Random, makeDeterministicRandomService()),
1414+
Effect.provide(harness.layer),
1415+
);
1416+
},
1417+
);
1418+
1419+
it.effect(
1420+
"treats aborted Claude stream failures after interruptTurn as interrupted without a runtime error",
1421+
() => {
1422+
const harness = makeHarness();
1423+
return Effect.gen(function* () {
1424+
const services = yield* Effect.services();
1425+
const runFork = Effect.runForkWith(services);
1426+
1427+
const adapter = yield* ClaudeAdapter;
1428+
const runtimeEvents: Array<ProviderRuntimeEvent> = [];
1429+
1430+
const runtimeEventsFiber = runFork(
1431+
Stream.runForEach(adapter.streamEvents, (event) =>
1432+
Effect.sync(() => {
1433+
runtimeEvents.push(event);
1434+
}),
1435+
),
1436+
);
1437+
1438+
yield* adapter.startSession({
1439+
threadId: THREAD_ID,
1440+
provider: "claudeAgent",
1441+
runtimeMode: "full-access",
1442+
});
1443+
1444+
const turn = yield* adapter.sendTurn({
1445+
threadId: THREAD_ID,
1446+
input: "hello",
1447+
attachments: [],
1448+
});
1449+
1450+
yield* adapter.interruptTurn(THREAD_ID, turn.turnId);
1451+
harness.query.fail(
1452+
"Error: Request was aborted.\n at makeRequest (/$bunfs/root/src/entrypoints/cli.js:50:3448)\n at processTicksAndRejections (native:7:39)",
1453+
);
1454+
1455+
yield* Effect.yieldNow;
1456+
yield* Effect.yieldNow;
1457+
yield* Effect.yieldNow;
1458+
runtimeEventsFiber.interruptUnsafe();
1459+
1460+
assert.deepEqual(
1461+
runtimeEvents.map((event) => event.type),
1462+
[
1463+
"session.started",
1464+
"session.configured",
1465+
"session.state.changed",
1466+
"turn.started",
1467+
"turn.completed",
1468+
"session.exited",
1469+
],
1470+
);
1471+
1472+
const turnCompleted = runtimeEvents[4];
1473+
assert.equal(turnCompleted?.type, "turn.completed");
1474+
if (turnCompleted?.type === "turn.completed") {
1475+
assert.equal(String(turnCompleted.turnId), String(turn.turnId));
1476+
assert.equal(turnCompleted.payload.state, "interrupted");
1477+
assert.equal(turnCompleted.payload.errorMessage, "Claude runtime interrupted.");
1478+
}
1479+
1480+
const sessionExited = runtimeEvents[5];
1481+
assert.equal(sessionExited?.type, "session.exited");
1482+
}).pipe(
1483+
Effect.provideService(Random.Random, makeDeterministicRandomService()),
1484+
Effect.provide(harness.layer),
1485+
);
1486+
},
1487+
);
1488+
11721489
it.effect("stopSession does not throw into the SDK prompt consumer", () => {
11731490
// The SDK consumes user messages via `for await (... of prompt)`.
11741491
// Stopping a session must end that loop cleanly — not throw an error.

0 commit comments

Comments
 (0)