Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions apps/cloud/src/api/protected.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@ const makeBaseEngine = (): ExecutionEngine =>
status: "completed",
result: { result: "ok", logs: [] },
}),
startCell: () =>
Effect.succeed({
status: "completed",
cellId: "cell_test",
cursor: 1,
events: [],
result: { result: "ok", logs: [] },
}),
waitCell: () => Effect.succeed(null),
terminateCell: () => Effect.succeed(null),
resume: () =>
Effect.succeed({
status: "completed",
Expand Down
6 changes: 6 additions & 0 deletions apps/cloud/src/engine/execution-usage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ export const withExecutionUsageTracking = <E extends Cause.YieldableError>(
engine
.executeWithPause(code)
.pipe(Effect.tap(() => Effect.sync(() => trackUsage(organizationId)))),
startCell: (code, options) =>
engine
.startCell(code, options)
.pipe(Effect.tap(() => Effect.sync(() => trackUsage(organizationId)))),
waitCell: (cellId, options) => engine.waitCell(cellId, options),
terminateCell: (cellId) => engine.terminateCell(cellId),
// resume doesn't count as usage
resume: (executionId, response) => engine.resume(executionId, response),
getPausedExecution: (executionId) => engine.getPausedExecution(executionId),
Expand Down
259 changes: 259 additions & 0 deletions e2e/scenarios/codemode-persistent-cells.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
import { expect } from "@effect/vitest";
import { Effect } from "effect";

import type { McpCallResult } from "../src/surfaces/mcp";
import { scenario } from "../src/scenario";
import { Mcp, Target } from "../src/services";

type CellEvent = {
readonly type?: unknown;
readonly item?: {
readonly type?: unknown;
readonly content?: {
readonly type?: unknown;
readonly text?: unknown;
};
readonly notification?: {
readonly message?: unknown;
readonly data?: unknown;
};
};
};

type CellObservation = {
readonly status?: unknown;
readonly cellId?: unknown;
readonly cursor?: unknown;
readonly events?: readonly CellEvent[];
readonly result?: {
readonly result?: unknown;
readonly output?: readonly unknown[];
};
};

const cellObservation = (result: McpCallResult): CellObservation => {
const structured = (result.raw as { readonly structuredContent?: unknown }).structuredContent;
expect(structured, `cell call returned structured content: ${result.text.slice(0, 300)}`).toEqual(
expect.any(Object),
);
return structured as CellObservation;
};

const eventTypes = (observation: CellObservation): readonly unknown[] =>
observation.events?.map((event) => event.type) ?? [];

const eventsOf = (observation: CellObservation): readonly CellEvent[] => observation.events ?? [];

const textOutput = (events: readonly CellEvent[]): readonly string[] =>
events.flatMap((event) =>
event.type === "output" &&
event.item?.type === "content" &&
event.item.content?.type === "text" &&
typeof event.item.content.text === "string"
? [event.item.content.text]
: [],
);

const notificationOutput = (events: readonly CellEvent[]): readonly string[] =>
events.flatMap((event) =>
event.type === "output" &&
event.item?.type === "notification" &&
typeof event.item.notification?.message === "string"
? [event.item.notification.message]
: [],
);

const requireString = (value: unknown, label: string): string => {
expect(value, label).toEqual(expect.any(String));
return value as string;
};

const requireNumber = (value: unknown, label: string): number => {
expect(value, label).toEqual(expect.any(Number));
return value as number;
};

scenario(
"Codemode · persistent cells yield, wait, notify, and terminate through MCP",
{ timeout: 180_000 },
Effect.gen(function* () {
const target = yield* Target;
const mcp = yield* Mcp;
const identity = yield* target.newIdentity();
const session = mcp.session(identity);

const tools = yield* session.listTools();
expect(tools, "persistent cell tools are advertised").toEqual(
expect.arrayContaining(["execute_cell", "wait_cell", "terminate_cell"]),
);

const first = yield* session.call("execute_cell", {
yieldAfterMs: 250,
code: [
'text("phase 1");',
"await yield_control();",
'text("phase 2");',
"await yieldControl();",
'notify({ message: "cell almost done", data: { phase: 3 } });',
'text("phase 3");',
'return { cell: "done" };',
].join("\n"),
});
expect(first.ok, `execute_cell starts and reaches the first yield: ${first.text}`).toBe(true);

const firstCell = cellObservation(first);
expect(firstCell.status, "the first observation leaves the cell running").toBe("running");
const cellId = requireString(firstCell.cellId, "execute_cell returns a reusable cell id");
let cursor = requireNumber(firstCell.cursor, "execute_cell returns an event cursor");
const firstEvents: CellEvent[] = [...eventsOf(firstCell)];
for (
let attempt = 0;
attempt < 2 && !firstEvents.some((event) => event.type === "yielded");
attempt++
) {
const yielded = yield* session.call("wait_cell", {
cellId,
after: cursor,
timeoutMs: 5_000,
});
expect(yielded.ok, `wait_cell observes the first yield: ${yielded.text}`).toBe(true);
const yieldedCell = cellObservation(yielded);
cursor = requireNumber(yieldedCell.cursor, "wait_cell advances the cursor");
firstEvents.push(...eventsOf(yieldedCell));
}
expect(
firstEvents.map((event) => event.type),
"the first phase includes output and an explicit yield checkpoint",
).toEqual(expect.arrayContaining(["output", "yielded"]));
expect(textOutput(firstEvents), "phase 1 output is visible immediately").toContain("phase 1");

const secondEvents: CellEvent[] = [];
for (let attempt = 0; attempt < 4; attempt++) {
const next = yield* session.call("wait_cell", {
cellId,
after: cursor,
timeoutMs: 5_000,
});
expect(next.ok, `wait_cell observes phase 2 progress: ${next.text}`).toBe(true);
const nextCell = cellObservation(next);
cursor = requireNumber(nextCell.cursor, "wait_cell advances the phase 2 cursor");
secondEvents.push(...eventsOf(nextCell));
if (
textOutput(secondEvents).includes("phase 2") &&
secondEvents.some((event) => event.type === "yielded")
) {
break;
}
}
expect(textOutput(secondEvents), "phase 2 output is delivered incrementally").toContain(
"phase 2",
);
expect(
secondEvents.map((event) => event.type),
"phase 2 includes another explicit yield checkpoint",
).toContain("yielded");

const finalEvents: CellEvent[] = [];
let completedCell: CellObservation | undefined;
for (let attempt = 0; attempt < 5; attempt++) {
const next = yield* session.call("wait_cell", {
cellId,
after: cursor,
timeoutMs: 5_000,
});
expect(next.ok, `wait_cell observes completion progress: ${next.text}`).toBe(true);
const nextCell = cellObservation(next);
cursor = requireNumber(nextCell.cursor, "wait_cell advances the completion cursor");
finalEvents.push(...eventsOf(nextCell));
if (nextCell.status === "completed") {
completedCell = nextCell;
break;
}
}
expect(completedCell, "wait_cell eventually observes the completed cell").toEqual(
expect.any(Object),
);

const observedCompletedCell = completedCell as CellObservation;
expect(observedCompletedCell.status, "the final observation is completed").toBe("completed");
expect(
finalEvents.map((event) => event.type),
"completion includes final outputs and terminal event",
).toEqual(expect.arrayContaining(["output", "completed"]));
expect(
notificationOutput(finalEvents),
"notifications are emitted as structured events",
).toContain("cell almost done");
expect(textOutput(finalEvents), "phase 3 output is visible before completion").toContain(
"phase 3",
);
expect(
observedCompletedCell.result?.result,
"the returned value is preserved on completion",
).toEqual({ cell: "done" });

const timerRun = yield* session.call("execute_cell", {
yieldAfterMs: 10,
code: [
"await new Promise((resolve) => setTimeout(resolve, 50));",
'text("timer fired");',
'return { timer: "done" };',
].join("\n"),
});
expect(timerRun.ok, `execute_cell starts a timer-backed cell: ${timerRun.text}`).toBe(true);
let timerCell = cellObservation(timerRun);
const timerCellId = requireString(timerCell.cellId, "timer cell id is reusable");
let timerCursor = requireNumber(timerCell.cursor, "timer cell returns an event cursor");
const timerEvents: CellEvent[] = [...eventsOf(timerCell)];
for (let attempt = 0; attempt < 5 && timerCell.status !== "completed"; attempt++) {
const next = yield* session.call("wait_cell", {
cellId: timerCellId,
after: timerCursor,
timeoutMs: 5_000,
});
expect(next.ok, `wait_cell observes timer-backed completion: ${next.text}`).toBe(true);
timerCell = cellObservation(next);
timerCursor = requireNumber(timerCell.cursor, "wait_cell advances the timer cursor");
timerEvents.push(...eventsOf(timerCell));
}
expect(timerCell.status, "timer-backed cells can complete after the first observation").toBe(
"completed",
);
expect(textOutput(timerEvents), "timer output is visible on completion").toContain(
"timer fired",
);
expect(timerCell.result?.result, "timer cell return value is preserved").toEqual({
timer: "done",
});

const running = yield* session.call("execute_cell", {
yieldAfterMs: 250,
code: [
"let i = 0;",
"while (true) {",
" text(`loop ${i}`);",
" i += 1;",
" await yield_control();",
"}",
].join("\n"),
});
expect(running.ok, `execute_cell starts a cooperative long-running cell: ${running.text}`).toBe(
true,
);
const runningCell = cellObservation(running);
expect(runningCell.status, "the loop cell is running after its first yield").toBe("running");
const runningCellId = requireString(runningCell.cellId, "loop cell id is reusable");

const terminated = yield* session.call("terminate_cell", {
cellId: runningCellId,
});
expect(terminated.ok, `terminate_cell returns a terminal observation: ${terminated.text}`).toBe(
true,
);
const terminatedCell = cellObservation(terminated);
expect(terminatedCell.status, "terminate_cell marks the cell terminated").toBe("terminated");
expect(eventTypes(terminatedCell), "the termination event is visible to clients").toContain(
"terminated",
);
}),
);
55 changes: 48 additions & 7 deletions packages/core/api/src/executions/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ const ExecuteRequest = Schema.Struct({
code: Schema.String,
});

const StartCellRequest = Schema.Struct({
code: Schema.String,
yieldAfterMs: Schema.optional(Schema.Number),
});

const CompletedResult = Schema.Struct({
status: Schema.Literal("completed"),
text: Schema.String,
Expand All @@ -26,6 +31,15 @@ const PausedResult = Schema.Struct({

const ExecuteResponse = Schema.Union([CompletedResult, PausedResult]);

const CellObservation = Schema.Struct({
status: Schema.Literals(["running", "completed", "failed", "terminated"]),
cellId: Schema.String,
cursor: Schema.Number,
events: Schema.Array(Schema.Unknown),
result: Schema.optional(Schema.Unknown),
error: Schema.optional(Schema.String),
});

const ResumeRequest = Schema.Struct({
action: Schema.Literals(["accept", "decline", "cancel"]),
content: Schema.optional(Schema.Unknown),
Expand All @@ -47,26 +61,53 @@ const ExecutionNotFoundError = Schema.TaggedStruct("ExecutionNotFoundError", {
// ---------------------------------------------------------------------------

const ExecutionParams = { executionId: Schema.String };
const CellParams = { cellId: Schema.String };
const CellWaitQuery = Schema.Struct({
after: Schema.optional(Schema.String),
timeoutMs: Schema.optional(Schema.String),
});

// ---------------------------------------------------------------------------
// Group
// ---------------------------------------------------------------------------

export const ExecutionsApi = HttpApiGroup.make("executions")
.add(
HttpApiEndpoint.get("getPaused", "/executions/:executionId", {
params: ExecutionParams,
success: PausedExecutionInfo,
error: [InternalError, ExecutionNotFoundError],
}),
)
.add(
HttpApiEndpoint.post("execute", "/executions", {
payload: ExecuteRequest,
success: ExecuteResponse,
error: InternalError,
}),
)
.add(
HttpApiEndpoint.post("startCell", "/execution-cells", {
payload: StartCellRequest,
success: CellObservation,
error: InternalError,
}),
)
.add(
HttpApiEndpoint.get("waitCell", "/execution-cells/:cellId", {
params: CellParams,
query: CellWaitQuery,
success: CellObservation,
error: [InternalError, ExecutionNotFoundError],
}),
)
.add(
HttpApiEndpoint.post("terminateCell", "/execution-cells/:cellId/terminate", {
params: CellParams,
success: CellObservation,
error: [InternalError, ExecutionNotFoundError],
}),
)
.add(
HttpApiEndpoint.get("getPaused", "/executions/:executionId", {
params: ExecutionParams,
success: PausedExecutionInfo,
error: [InternalError, ExecutionNotFoundError],
}),
)
.add(
HttpApiEndpoint.post("resume", "/executions/:executionId/resume", {
params: ExecutionParams,
Expand Down
Loading
Loading