Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
323 changes: 319 additions & 4 deletions apps/server/src/provider/Layers/ClaudeAdapter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,8 @@ async function readFirstPromptMessage(

const THREAD_ID = ThreadId.makeUnsafe("thread-claude-1");
const RESUME_THREAD_ID = ThreadId.makeUnsafe("thread-claude-resume");
const INTERRUPTED_TOOL_RESULT_TEXT =
"The user doesn't want to proceed with this tool use. The tool use was rejected (eg. if it was a file edit, the new_string was NOT written to the file). STOP what you are doing and wait for the user to tell you how to proceed.";

describe("ClaudeAdapterLive", () => {
it.effect("returns validation error for non-claude provider on startSession", () => {
Expand Down Expand Up @@ -1101,6 +1103,182 @@ describe("ClaudeAdapterLive", () => {
);
});

it.effect("does not surface ede_diagnostic-only Claude results as runtime errors", () => {
const harness = makeHarness();
return Effect.gen(function* () {
const adapter = yield* ClaudeAdapter;

const runtimeEventsFiber = yield* Stream.take(adapter.streamEvents, 6).pipe(
Stream.runCollect,
Effect.forkChild,
);

const session = yield* adapter.startSession({
threadId: THREAD_ID,
provider: "claudeAgent",
runtimeMode: "full-access",
});

const turn = yield* adapter.sendTurn({
threadId: session.threadId,
input: "hello",
attachments: [],
});

harness.query.emit({
type: "result",
subtype: "error_during_execution",
is_error: false,
errors: ["[ede_diagnostic] result_type=user last_content_type=n/a stop_reason=tool_use"],
stop_reason: "tool_use",
session_id: "sdk-session-ede-diagnostic",
uuid: "result-ede-diagnostic",
} as unknown as SDKMessage);

const runtimeEvents = Array.from(yield* Fiber.join(runtimeEventsFiber));
assert.deepEqual(
runtimeEvents.map((event) => event.type),
[
"session.started",
"session.configured",
"session.state.changed",
"turn.started",
"thread.started",
"turn.completed",
],
);

const turnCompleted = runtimeEvents[runtimeEvents.length - 1];
assert.equal(turnCompleted?.type, "turn.completed");
if (turnCompleted?.type === "turn.completed") {
assert.equal(String(turnCompleted.turnId), String(turn.turnId));
assert.equal(turnCompleted.payload.state, "completed");
assert.isUndefined(turnCompleted.payload.errorMessage);
assert.equal(turnCompleted.payload.stopReason, "tool_use");
}
}).pipe(
Effect.provideService(Random.Random, makeDeterministicRandomService()),
Effect.provide(harness.layer),
);
});

it.effect(
"marks rejected tool results after interruptTurn as declined and completes interrupted",
() => {
const harness = makeHarness();
return Effect.gen(function* () {
const adapter = yield* ClaudeAdapter;

const runtimeEventsFiber = yield* Stream.take(adapter.streamEvents, 10).pipe(
Stream.runCollect,
Effect.forkChild,
);

const session = yield* adapter.startSession({
threadId: THREAD_ID,
provider: "claudeAgent",
runtimeMode: "full-access",
});

const turn = yield* adapter.sendTurn({
threadId: session.threadId,
input: "hello",
attachments: [],
});

harness.query.emit({
type: "stream_event",
session_id: "sdk-session-interrupted-tool-result",
uuid: "stream-tool-start-interrupted",
parent_tool_use_id: null,
event: {
type: "content_block_start",
index: 1,
content_block: {
type: "tool_use",
id: "tool-bash-1",
name: "Bash",
input: {
command: "ls",
},
},
},
} as unknown as SDKMessage);

yield* adapter.interruptTurn(session.threadId, turn.turnId);

harness.query.emit({
type: "user",
session_id: "sdk-session-interrupted-tool-result",
uuid: "user-tool-result-interrupted",
parent_tool_use_id: null,
message: {
role: "user",
content: [
{
type: "tool_result",
tool_use_id: "tool-bash-1",
content: INTERRUPTED_TOOL_RESULT_TEXT,
is_error: true,
},
],
},
} as unknown as SDKMessage);

harness.query.emit({
type: "result",
subtype: "error_during_execution",
is_error: true,
errors: [INTERRUPTED_TOOL_RESULT_TEXT],
stop_reason: "tool_use",
session_id: "sdk-session-interrupted-tool-result",
uuid: "result-interrupted-tool-result",
} as unknown as SDKMessage);

const runtimeEvents = Array.from(yield* Fiber.join(runtimeEventsFiber));
assert.deepEqual(
runtimeEvents.map((event) => event.type),
[
"session.started",
"session.configured",
"session.state.changed",
"turn.started",
"thread.started",
"item.started",
"item.updated",
"content.delta",
"item.completed",
"turn.completed",
],
);

const toolUpdated = runtimeEvents[6];
assert.equal(toolUpdated?.type, "item.updated");
if (toolUpdated?.type === "item.updated") {
assert.equal(toolUpdated.payload.status, "declined");
}

const toolCompleted = runtimeEvents[8];
assert.equal(toolCompleted?.type, "item.completed");
if (toolCompleted?.type === "item.completed") {
assert.equal(toolCompleted.payload.status, "declined");
}

const turnCompleted = runtimeEvents[9];
assert.equal(turnCompleted?.type, "turn.completed");
if (turnCompleted?.type === "turn.completed") {
assert.equal(String(turnCompleted.turnId), String(turn.turnId));
assert.equal(turnCompleted.payload.state, "interrupted");
assert.equal(turnCompleted.payload.errorMessage, "Claude runtime interrupted.");
assert.equal(turnCompleted.payload.stopReason, "tool_use");
}
}).pipe(
Effect.provideService(Random.Random, makeDeterministicRandomService()),
Effect.provide(harness.layer),
);
},
);

it.effect("closes the session when the Claude stream aborts after a turn starts", () => {
const harness = makeHarness();
return Effect.gen(function* () {
Expand Down Expand Up @@ -1169,6 +1347,145 @@ describe("ClaudeAdapterLive", () => {
);
});

it.effect(
"treats Claude ede_diagnostic tool_use cancellation as interrupted without a runtime error",
() => {
const harness = makeHarness();
return Effect.gen(function* () {
const services = yield* Effect.services();
const runFork = Effect.runForkWith(services);

const adapter = yield* ClaudeAdapter;
const runtimeEvents: Array<ProviderRuntimeEvent> = [];

const runtimeEventsFiber = runFork(
Stream.runForEach(adapter.streamEvents, (event) =>
Effect.sync(() => {
runtimeEvents.push(event);
}),
),
);

yield* adapter.startSession({
threadId: THREAD_ID,
provider: "claudeAgent",
runtimeMode: "full-access",
});

const turn = yield* adapter.sendTurn({
threadId: THREAD_ID,
input: "hello",
attachments: [],
});

harness.query.fail(
new Error("[ede_diagnostic] result_type=user last_content_type=n/a stop_reason=tool_use"),
);

yield* Effect.yieldNow;
yield* Effect.yieldNow;
yield* Effect.yieldNow;
runtimeEventsFiber.interruptUnsafe();

assert.deepEqual(
runtimeEvents.map((event) => event.type),
[
"session.started",
"session.configured",
"session.state.changed",
"turn.started",
"turn.completed",
"session.exited",
],
);

const turnCompleted = runtimeEvents[4];
assert.equal(turnCompleted?.type, "turn.completed");
if (turnCompleted?.type === "turn.completed") {
assert.equal(String(turnCompleted.turnId), String(turn.turnId));
assert.equal(turnCompleted.payload.state, "interrupted");
assert.equal(turnCompleted.payload.errorMessage, "Claude runtime interrupted.");
}

const sessionExited = runtimeEvents[5];
assert.equal(sessionExited?.type, "session.exited");
}).pipe(
Effect.provideService(Random.Random, makeDeterministicRandomService()),
Effect.provide(harness.layer),
);
},
);

it.effect(
"treats aborted Claude stream failures after interruptTurn as interrupted without a runtime error",
() => {
const harness = makeHarness();
return Effect.gen(function* () {
const services = yield* Effect.services();
const runFork = Effect.runForkWith(services);

const adapter = yield* ClaudeAdapter;
const runtimeEvents: Array<ProviderRuntimeEvent> = [];

const runtimeEventsFiber = runFork(
Stream.runForEach(adapter.streamEvents, (event) =>
Effect.sync(() => {
runtimeEvents.push(event);
}),
),
);

yield* adapter.startSession({
threadId: THREAD_ID,
provider: "claudeAgent",
runtimeMode: "full-access",
});

const turn = yield* adapter.sendTurn({
threadId: THREAD_ID,
input: "hello",
attachments: [],
});

yield* adapter.interruptTurn(THREAD_ID, turn.turnId);
harness.query.fail(
"Error: Request was aborted.\n at makeRequest (/$bunfs/root/src/entrypoints/cli.js:50:3448)\n at processTicksAndRejections (native:7:39)",
);

yield* Effect.yieldNow;
yield* Effect.yieldNow;
yield* Effect.yieldNow;
runtimeEventsFiber.interruptUnsafe();

assert.deepEqual(
runtimeEvents.map((event) => event.type),
[
"session.started",
"session.configured",
"session.state.changed",
"turn.started",
"turn.completed",
"session.exited",
],
);

const turnCompleted = runtimeEvents[4];
assert.equal(turnCompleted?.type, "turn.completed");
if (turnCompleted?.type === "turn.completed") {
assert.equal(String(turnCompleted.turnId), String(turn.turnId));
assert.equal(turnCompleted.payload.state, "interrupted");
assert.equal(turnCompleted.payload.errorMessage, "Claude runtime interrupted.");
}

const sessionExited = runtimeEvents[5];
assert.equal(sessionExited?.type, "session.exited");
}).pipe(
Effect.provideService(Random.Random, makeDeterministicRandomService()),
Effect.provide(harness.layer),
);
},
);

it.effect("stopSession does not throw into the SDK prompt consumer", () => {
// The SDK consumes user messages via `for await (... of prompt)`.
// Stopping a session must end that loop cleanly — not throw an error.
Expand Down Expand Up @@ -1228,11 +1545,9 @@ describe("ClaudeAdapterLive", () => {

runtimeEventsFiber.interruptUnsafe();

assert.equal(
assert.isUndefined(
promptConsumerError,
undefined,
`Prompt consumer should not receive a thrown error on session stop, ` +
`but got: "${promptConsumerError instanceof Error ? promptConsumerError.message : String(promptConsumerError)}"`,
"Prompt consumer should not receive a thrown error on session stop",
);
}).pipe(
Effect.provideService(Random.Random, makeDeterministicRandomService()),
Expand Down
Loading
Loading