Skip to content

Commit 7510b9f

Browse files
committed
fix: harden Claude stream interruption handling
Cherry-picked from upstream pingdotgg#1893
1 parent 7184e82 commit 7510b9f

3 files changed

Lines changed: 447 additions & 76 deletions

File tree

apps/server/src/provider/Layers/ClaudeAdapter.test.ts

Lines changed: 319 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,8 @@ async function readFirstPromptMessage(
239239

240240
const THREAD_ID = ThreadId.make("thread-claude-1");
241241
const RESUME_THREAD_ID = ThreadId.make("thread-claude-resume");
242+
const INTERRUPTED_TOOL_RESULT_TEXT =
243+
"The user doesn't want to proceed with this tool use. The tool use was rejected (eg. if it was a file edit, the new_string was NOT written to the file). STOP what you are doing and wait for the user to tell you how to proceed.";
242244

243245
describe("ClaudeAdapterLive", () => {
244246
it.effect("returns validation error for non-claude provider on startSession", () => {
@@ -1358,6 +1360,182 @@ describe("ClaudeAdapterLive", () => {
13581360
);
13591361
});
13601362

1363+
it.effect("does not surface ede_diagnostic-only Claude results as runtime errors", () => {
1364+
const harness = makeHarness();
1365+
return Effect.gen(function* () {
1366+
const adapter = yield* ClaudeAdapter;
1367+
1368+
const runtimeEventsFiber = yield* Stream.take(adapter.streamEvents, 6).pipe(
1369+
Stream.runCollect,
1370+
Effect.forkChild,
1371+
);
1372+
1373+
const session = yield* adapter.startSession({
1374+
threadId: THREAD_ID,
1375+
provider: "claudeAgent",
1376+
runtimeMode: "full-access",
1377+
});
1378+
1379+
const turn = yield* adapter.sendTurn({
1380+
threadId: session.threadId,
1381+
input: "hello",
1382+
attachments: [],
1383+
});
1384+
1385+
harness.query.emit({
1386+
type: "result",
1387+
subtype: "error_during_execution",
1388+
is_error: false,
1389+
errors: ["[ede_diagnostic] result_type=user last_content_type=n/a stop_reason=tool_use"],
1390+
stop_reason: "tool_use",
1391+
session_id: "sdk-session-ede-diagnostic",
1392+
uuid: "result-ede-diagnostic",
1393+
} as unknown as SDKMessage);
1394+
1395+
const runtimeEvents = Array.from(yield* Fiber.join(runtimeEventsFiber));
1396+
assert.deepEqual(
1397+
runtimeEvents.map((event) => event.type),
1398+
[
1399+
"session.started",
1400+
"session.configured",
1401+
"session.state.changed",
1402+
"turn.started",
1403+
"thread.started",
1404+
"turn.completed",
1405+
],
1406+
);
1407+
1408+
const turnCompleted = runtimeEvents[runtimeEvents.length - 1];
1409+
assert.equal(turnCompleted?.type, "turn.completed");
1410+
if (turnCompleted?.type === "turn.completed") {
1411+
assert.equal(String(turnCompleted.turnId), String(turn.turnId));
1412+
assert.equal(turnCompleted.payload.state, "completed");
1413+
assert.isUndefined(turnCompleted.payload.errorMessage);
1414+
assert.equal(turnCompleted.payload.stopReason, "tool_use");
1415+
}
1416+
}).pipe(
1417+
Effect.provideService(Random.Random, makeDeterministicRandomService()),
1418+
Effect.provide(harness.layer),
1419+
);
1420+
});
1421+
1422+
it.effect(
1423+
"marks rejected tool results after interruptTurn as declined and completes interrupted",
1424+
() => {
1425+
const harness = makeHarness();
1426+
return Effect.gen(function* () {
1427+
const adapter = yield* ClaudeAdapter;
1428+
1429+
const runtimeEventsFiber = yield* Stream.take(adapter.streamEvents, 10).pipe(
1430+
Stream.runCollect,
1431+
Effect.forkChild,
1432+
);
1433+
1434+
const session = yield* adapter.startSession({
1435+
threadId: THREAD_ID,
1436+
provider: "claudeAgent",
1437+
runtimeMode: "full-access",
1438+
});
1439+
1440+
const turn = yield* adapter.sendTurn({
1441+
threadId: session.threadId,
1442+
input: "hello",
1443+
attachments: [],
1444+
});
1445+
1446+
harness.query.emit({
1447+
type: "stream_event",
1448+
session_id: "sdk-session-interrupted-tool-result",
1449+
uuid: "stream-tool-start-interrupted",
1450+
parent_tool_use_id: null,
1451+
event: {
1452+
type: "content_block_start",
1453+
index: 1,
1454+
content_block: {
1455+
type: "tool_use",
1456+
id: "tool-bash-1",
1457+
name: "Bash",
1458+
input: {
1459+
command: "ls",
1460+
},
1461+
},
1462+
},
1463+
} as unknown as SDKMessage);
1464+
1465+
yield* adapter.interruptTurn(session.threadId, turn.turnId);
1466+
1467+
harness.query.emit({
1468+
type: "user",
1469+
session_id: "sdk-session-interrupted-tool-result",
1470+
uuid: "user-tool-result-interrupted",
1471+
parent_tool_use_id: null,
1472+
message: {
1473+
role: "user",
1474+
content: [
1475+
{
1476+
type: "tool_result",
1477+
tool_use_id: "tool-bash-1",
1478+
content: INTERRUPTED_TOOL_RESULT_TEXT,
1479+
is_error: true,
1480+
},
1481+
],
1482+
},
1483+
} as unknown as SDKMessage);
1484+
1485+
harness.query.emit({
1486+
type: "result",
1487+
subtype: "error_during_execution",
1488+
is_error: true,
1489+
errors: [INTERRUPTED_TOOL_RESULT_TEXT],
1490+
stop_reason: "tool_use",
1491+
session_id: "sdk-session-interrupted-tool-result",
1492+
uuid: "result-interrupted-tool-result",
1493+
} as unknown as SDKMessage);
1494+
1495+
const runtimeEvents = Array.from(yield* Fiber.join(runtimeEventsFiber));
1496+
assert.deepEqual(
1497+
runtimeEvents.map((event) => event.type),
1498+
[
1499+
"session.started",
1500+
"session.configured",
1501+
"session.state.changed",
1502+
"turn.started",
1503+
"thread.started",
1504+
"item.started",
1505+
"item.updated",
1506+
"content.delta",
1507+
"item.completed",
1508+
"turn.completed",
1509+
],
1510+
);
1511+
1512+
const toolUpdated = runtimeEvents[6];
1513+
assert.equal(toolUpdated?.type, "item.updated");
1514+
if (toolUpdated?.type === "item.updated") {
1515+
assert.equal(toolUpdated.payload.status, "declined");
1516+
}
1517+
1518+
const toolCompleted = runtimeEvents[8];
1519+
assert.equal(toolCompleted?.type, "item.completed");
1520+
if (toolCompleted?.type === "item.completed") {
1521+
assert.equal(toolCompleted.payload.status, "declined");
1522+
}
1523+
1524+
const turnCompleted = runtimeEvents[9];
1525+
assert.equal(turnCompleted?.type, "turn.completed");
1526+
if (turnCompleted?.type === "turn.completed") {
1527+
assert.equal(String(turnCompleted.turnId), String(turn.turnId));
1528+
assert.equal(turnCompleted.payload.state, "interrupted");
1529+
assert.equal(turnCompleted.payload.errorMessage, "Claude runtime interrupted.");
1530+
assert.equal(turnCompleted.payload.stopReason, "tool_use");
1531+
}
1532+
}).pipe(
1533+
Effect.provideService(Random.Random, makeDeterministicRandomService()),
1534+
Effect.provide(harness.layer),
1535+
);
1536+
},
1537+
);
1538+
13611539
it.effect("closes the session when the Claude stream aborts after a turn starts", () => {
13621540
const harness = makeHarness();
13631541
return Effect.gen(function* () {
@@ -1426,6 +1604,145 @@ describe("ClaudeAdapterLive", () => {
14261604
);
14271605
});
14281606

1607+
it.effect(
1608+
"treats Claude ede_diagnostic tool_use cancellation as interrupted without a runtime error",
1609+
() => {
1610+
const harness = makeHarness();
1611+
return Effect.gen(function* () {
1612+
const services = yield* Effect.services();
1613+
const runFork = Effect.runForkWith(services);
1614+
1615+
const adapter = yield* ClaudeAdapter;
1616+
const runtimeEvents: Array<ProviderRuntimeEvent> = [];
1617+
1618+
const runtimeEventsFiber = runFork(
1619+
Stream.runForEach(adapter.streamEvents, (event) =>
1620+
Effect.sync(() => {
1621+
runtimeEvents.push(event);
1622+
}),
1623+
),
1624+
);
1625+
1626+
yield* adapter.startSession({
1627+
threadId: THREAD_ID,
1628+
provider: "claudeAgent",
1629+
runtimeMode: "full-access",
1630+
});
1631+
1632+
const turn = yield* adapter.sendTurn({
1633+
threadId: THREAD_ID,
1634+
input: "hello",
1635+
attachments: [],
1636+
});
1637+
1638+
harness.query.fail(
1639+
new Error("[ede_diagnostic] result_type=user last_content_type=n/a stop_reason=tool_use"),
1640+
);
1641+
1642+
yield* Effect.yieldNow;
1643+
yield* Effect.yieldNow;
1644+
yield* Effect.yieldNow;
1645+
runtimeEventsFiber.interruptUnsafe();
1646+
1647+
assert.deepEqual(
1648+
runtimeEvents.map((event) => event.type),
1649+
[
1650+
"session.started",
1651+
"session.configured",
1652+
"session.state.changed",
1653+
"turn.started",
1654+
"turn.completed",
1655+
"session.exited",
1656+
],
1657+
);
1658+
1659+
const turnCompleted = runtimeEvents[4];
1660+
assert.equal(turnCompleted?.type, "turn.completed");
1661+
if (turnCompleted?.type === "turn.completed") {
1662+
assert.equal(String(turnCompleted.turnId), String(turn.turnId));
1663+
assert.equal(turnCompleted.payload.state, "interrupted");
1664+
assert.equal(turnCompleted.payload.errorMessage, "Claude runtime interrupted.");
1665+
}
1666+
1667+
const sessionExited = runtimeEvents[5];
1668+
assert.equal(sessionExited?.type, "session.exited");
1669+
}).pipe(
1670+
Effect.provideService(Random.Random, makeDeterministicRandomService()),
1671+
Effect.provide(harness.layer),
1672+
);
1673+
},
1674+
);
1675+
1676+
it.effect(
1677+
"treats aborted Claude stream failures after interruptTurn as interrupted without a runtime error",
1678+
() => {
1679+
const harness = makeHarness();
1680+
return Effect.gen(function* () {
1681+
const services = yield* Effect.services();
1682+
const runFork = Effect.runForkWith(services);
1683+
1684+
const adapter = yield* ClaudeAdapter;
1685+
const runtimeEvents: Array<ProviderRuntimeEvent> = [];
1686+
1687+
const runtimeEventsFiber = runFork(
1688+
Stream.runForEach(adapter.streamEvents, (event) =>
1689+
Effect.sync(() => {
1690+
runtimeEvents.push(event);
1691+
}),
1692+
),
1693+
);
1694+
1695+
yield* adapter.startSession({
1696+
threadId: THREAD_ID,
1697+
provider: "claudeAgent",
1698+
runtimeMode: "full-access",
1699+
});
1700+
1701+
const turn = yield* adapter.sendTurn({
1702+
threadId: THREAD_ID,
1703+
input: "hello",
1704+
attachments: [],
1705+
});
1706+
1707+
yield* adapter.interruptTurn(THREAD_ID, turn.turnId);
1708+
harness.query.fail(
1709+
"Error: Request was aborted.\n at makeRequest (/$bunfs/root/src/entrypoints/cli.js:50:3448)\n at processTicksAndRejections (native:7:39)",
1710+
);
1711+
1712+
yield* Effect.yieldNow;
1713+
yield* Effect.yieldNow;
1714+
yield* Effect.yieldNow;
1715+
runtimeEventsFiber.interruptUnsafe();
1716+
1717+
assert.deepEqual(
1718+
runtimeEvents.map((event) => event.type),
1719+
[
1720+
"session.started",
1721+
"session.configured",
1722+
"session.state.changed",
1723+
"turn.started",
1724+
"turn.completed",
1725+
"session.exited",
1726+
],
1727+
);
1728+
1729+
const turnCompleted = runtimeEvents[4];
1730+
assert.equal(turnCompleted?.type, "turn.completed");
1731+
if (turnCompleted?.type === "turn.completed") {
1732+
assert.equal(String(turnCompleted.turnId), String(turn.turnId));
1733+
assert.equal(turnCompleted.payload.state, "interrupted");
1734+
assert.equal(turnCompleted.payload.errorMessage, "Claude runtime interrupted.");
1735+
}
1736+
1737+
const sessionExited = runtimeEvents[5];
1738+
assert.equal(sessionExited?.type, "session.exited");
1739+
}).pipe(
1740+
Effect.provideService(Random.Random, makeDeterministicRandomService()),
1741+
Effect.provide(harness.layer),
1742+
);
1743+
},
1744+
);
1745+
14291746
it.effect("stopSession does not throw into the SDK prompt consumer", () => {
14301747
// The SDK consumes user messages via `for await (... of prompt)`.
14311748
// Stopping a session must end that loop cleanly — not throw an error.
@@ -1485,11 +1802,9 @@ describe("ClaudeAdapterLive", () => {
14851802

14861803
runtimeEventsFiber.interruptUnsafe();
14871804

1488-
assert.equal(
1805+
assert.isUndefined(
14891806
promptConsumerError,
1490-
undefined,
1491-
`Prompt consumer should not receive a thrown error on session stop, ` +
1492-
`but got: "${promptConsumerError instanceof Error ? promptConsumerError.message : String(promptConsumerError)}"`,
1807+
"Prompt consumer should not receive a thrown error on session stop",
14931808
);
14941809
}).pipe(
14951810
Effect.provideService(Random.Random, makeDeterministicRandomService()),

0 commit comments

Comments
 (0)