diff --git a/apps/api/package.json b/apps/api/package.json index 7bbbf85f..6773a707 100644 --- a/apps/api/package.json +++ b/apps/api/package.json @@ -31,7 +31,7 @@ "drizzle-kit": "0.31.10", "partyserver": "^0.5.6", "typescript": "^5.9.3", - "vitest": "^4.1.6", + "vitest": "^4.1.9", "wrangler": "^4.90.1" }, "dependencies": { @@ -48,7 +48,7 @@ "aws4fetch": "^1.0.20", "cron-parser": "^5.5.0", "drizzle-orm": "0.45.2", - "hono": "^4.12.18", + "hono": "^4.12.26", "jose": "^6.2.3", "nanoid": "^5.1.11", "openai": "^6.37.0", diff --git a/apps/api/src/context.ts b/apps/api/src/context.ts index 0c3e69bb..bcf23c3c 100644 --- a/apps/api/src/context.ts +++ b/apps/api/src/context.ts @@ -4,6 +4,7 @@ import type { FFmpegContainer } from "./containers/ffmpeg-container"; import type { LanguageSandbox } from "./containers/language-sandbox"; import type { AgentRunner } from "./durable-objects/agent-runner"; import { DatabaseDO } from "./durable-objects/database-do"; +import type { EmailAgentRunner } from "./durable-objects/email-agent-runner"; import type { MailboxDO } from "./durable-objects/mailbox-do"; import type { WorkflowAgent } from "./durable-objects/workflow-agent"; @@ -17,6 +18,7 @@ export interface Bindings { WORKFLOW_AGENT: DurableObjectNamespace; DATABASE: DurableObjectNamespace; AGENT_RUNNER: DurableObjectNamespace; + EMAIL_AGENT_RUNNER: DurableObjectNamespace; MAILBOX: DurableObjectNamespace; FFMPEG_CONTAINER?: DurableObjectNamespace; DUCKDB_SANDBOX?: DurableObjectNamespace; diff --git a/apps/api/src/durable-objects/agent-llm.ts b/apps/api/src/durable-objects/agent-llm.ts new file mode 100644 index 00000000..c3a234e4 --- /dev/null +++ b/apps/api/src/durable-objects/agent-llm.ts @@ -0,0 +1,428 @@ +/** + * Provider-agnostic LLM dispatch shared by the agent Durable Objects. + * + * Converts the generic {@link AgentMessage} history into each provider's wire + * format, calls the model through the AI Gateway, and normalises the response + * into an {@link LLMResponse}. Extracted so both AgentRunner and + * EmailAgentRunner share one implementation. + */ + +import Anthropic from "@anthropic-ai/sdk"; +import type { ToolDefinition } from "@dafthunk/runtime"; +import type { AgentProvider } from "@dafthunk/runtime/nodes/agent/base-agent-node"; +import type { + AgentMessage, + LLMResponse, +} from "@dafthunk/runtime/utils/agent-loop"; +import { + getAnthropicConfig, + getGoogleAIConfig, + getOpenAIConfig, +} from "@dafthunk/runtime/utils/ai-gateway"; +import { GoogleGenAI } from "@google/genai"; +import OpenAI from "openai"; + +import type { Bindings } from "../context"; + +export interface CallLLMArgs { + provider: AgentProvider; + model: string; + instructions: string; + messages: AgentMessage[]; + tools: ToolDefinition[]; + /** Provider built-in tools (e.g. Gemini googleSearch) merged with `tools`. */ + builtInTools?: Record[]; + /** JSON schema constraining the output (structured output). */ + schema?: Record; +} + +/** Dispatch an LLM call to the configured provider. */ +export function callAgentLLM( + env: Bindings, + args: CallLLMArgs +): Promise { + const { provider } = args; + switch (provider) { + case "anthropic": + return callAnthropic(env, args); + case "google": + return callGoogle(env, args); + case "openai": + return callOpenAI(env, args); + case "workers-ai": + return callWorkersAI(env, args); + default: + throw new Error(`Unsupported provider: ${provider}`); + } +} + +// ── Anthropic ────────────────────────────────────────────────────────────── + +async function callAnthropic( + env: Bindings, + { model, instructions, messages, tools, schema }: CallLLMArgs +): Promise { + const client = new Anthropic({ + apiKey: "gateway-managed", + timeout: 120_000, + ...getAnthropicConfig(env), + }); + + const anthropicMessages: Anthropic.MessageParam[] = messages.map((m) => { + if (m.role === "assistant" && m.toolCalls?.length) { + return { + role: "assistant" as const, + content: [ + ...(m.content ? [{ type: "text" as const, text: m.content }] : []), + ...m.toolCalls.map((tc) => ({ + type: "tool_use" as const, + id: tc.id, + name: tc.name, + input: tc.arguments, + })), + ], + }; + } + if (m.role === "tool") { + return { + role: "user" as const, + content: [ + { + type: "tool_result" as const, + tool_use_id: m.toolCallId ?? "", + content: m.content, + }, + ], + }; + } + return { + role: m.role === "user" ? ("user" as const) : ("assistant" as const), + content: m.content, + }; + }); + + const anthropicTools: Anthropic.Tool[] = tools.map((t) => ({ + name: t.name, + description: t.description, + input_schema: t.parameters as Anthropic.Tool.InputSchema, + })); + + const systemPrompt = schema + ? `${instructions}\n\nYou MUST respond with valid JSON matching this schema:\n${JSON.stringify(schema)}` + : instructions; + + const response = await client.messages.create({ + model, + max_tokens: 4096, + messages: anthropicMessages, + ...(systemPrompt && { system: systemPrompt }), + ...(anthropicTools.length > 0 && { tools: anthropicTools }), + }); + + let content = ""; + const toolCalls: LLMResponse["toolCalls"] = []; + for (const block of response.content) { + if (block.type === "text") { + content += block.text; + } else if (block.type === "tool_use") { + toolCalls.push({ + id: block.id, + name: block.name, + arguments: block.input as Record, + }); + } + } + + return { + content, + toolCalls, + inputTokens: response.usage.input_tokens, + outputTokens: response.usage.output_tokens, + }; +} + +// ── Google (Gemini) ────────────────────────────────────────────────────────── + +async function callGoogle( + env: Bindings, + { model, instructions, messages, tools, builtInTools, schema }: CallLLMArgs +): Promise { + const ai = new GoogleGenAI({ + apiKey: "gateway-managed", + ...getGoogleAIConfig(env), + }); + + const contents: Array<{ + role: string; + parts: Array>; + }> = []; + for (const m of messages) { + if (m.role === "user") { + contents.push({ role: "user", parts: [{ text: m.content }] }); + } else if (m.role === "assistant") { + const parts: Array> = []; + if (m.content) parts.push({ text: m.content }); + if (m.toolCalls) { + for (const tc of m.toolCalls) { + parts.push({ + functionCall: { name: tc.name, args: tc.arguments }, + ...(tc.thoughtSignature && { + thoughtSignature: tc.thoughtSignature, + }), + }); + } + } + contents.push({ role: "model", parts }); + } else if (m.role === "tool") { + contents.push({ + role: "user", + parts: [ + { + functionResponse: { + name: m.toolName, + response: safeJsonParse(m.content), + }, + }, + ], + }); + } + } + + const functionDeclarations = tools.map((t) => ({ + name: t.name, + description: t.description, + parameters: t.parameters, + })); + + const config: Record = {}; + const allTools: Record[] = [...(builtInTools ?? [])]; + if (functionDeclarations.length > 0) { + allTools.push({ functionDeclarations }); + } + if (allTools.length > 0) { + config.tools = allTools; + } + if (schema) { + config.responseMimeType = "application/json"; + config.responseSchema = schema; + } + + const response = await ai.models.generateContent({ + model, + contents: contents as any, + config: config as any, + ...(instructions && { systemInstruction: instructions }), + }); + + let content = ""; + const toolCalls: LLMResponse["toolCalls"] = []; + if (response.candidates?.[0]?.content?.parts) { + for (const part of response.candidates[0].content.parts as any[]) { + if (part.text) { + content += part.text; + } + if (part.functionCall) { + toolCalls.push({ + id: `gemini_${geminiCallId()}`, + name: part.functionCall.name, + arguments: part.functionCall.args ?? {}, + ...(part.thoughtSignature && { + thoughtSignature: part.thoughtSignature, + }), + }); + } + } + } + + const usage = response.usageMetadata; + return { + content, + toolCalls, + inputTokens: usage?.promptTokenCount ?? 0, + outputTokens: usage?.candidatesTokenCount ?? 0, + }; +} + +// ── OpenAI ─────────────────────────────────────────────────────────────────── + +async function callOpenAI( + env: Bindings, + { model, instructions, messages, tools, schema }: CallLLMArgs +): Promise { + const client = new OpenAI({ + apiKey: "gateway-managed", + timeout: 120_000, + ...getOpenAIConfig(env), + }); + + const openaiMessages: OpenAI.ChatCompletionMessageParam[] = []; + if (instructions) { + openaiMessages.push({ role: "system", content: instructions }); + } + for (const m of messages) { + if (m.role === "user") { + openaiMessages.push({ role: "user", content: m.content }); + } else if (m.role === "assistant") { + openaiMessages.push({ + role: "assistant", + content: m.content || null, + ...(m.toolCalls?.length && { + tool_calls: m.toolCalls.map((tc) => ({ + id: tc.id, + type: "function" as const, + function: { + name: tc.name, + arguments: JSON.stringify(tc.arguments), + }, + })), + }), + }); + } else if (m.role === "tool") { + openaiMessages.push({ + role: "tool", + tool_call_id: m.toolCallId ?? "", + content: m.content, + }); + } + } + + const openaiTools: OpenAI.ChatCompletionTool[] = tools.map((t) => ({ + type: "function" as const, + function: { + name: t.name, + description: t.description, + parameters: t.parameters, + }, + })); + + const responseFormat = schema + ? { + type: "json_schema" as const, + json_schema: { name: "response", schema, strict: true }, + } + : undefined; + + const completion = await client.chat.completions.create({ + model, + max_tokens: 4096, + messages: openaiMessages, + ...(openaiTools.length > 0 && { tools: openaiTools }), + ...(responseFormat && { response_format: responseFormat }), + }); + + const choice = completion.choices[0]; + const content = choice?.message?.content ?? ""; + const toolCalls: LLMResponse["toolCalls"] = []; + if (choice?.message?.tool_calls) { + for (const tc of choice.message.tool_calls) { + if (tc.type === "function") { + toolCalls.push({ + id: tc.id, + name: tc.function.name, + arguments: safeJsonParse(tc.function.arguments), + }); + } + } + } + + return { + content, + toolCalls, + inputTokens: completion.usage?.prompt_tokens ?? 0, + outputTokens: completion.usage?.completion_tokens ?? 0, + }; +} + +// ── Workers AI ─────────────────────────────────────────────────────────────── + +async function callWorkersAI( + env: Bindings, + { model, instructions, messages, tools, schema }: CallLLMArgs +): Promise { + const aiMessages: Array<{ role: string; content: string }> = []; + + const systemPrompt = schema + ? `${instructions}\n\nYou MUST respond with valid JSON matching this schema:\n${JSON.stringify(schema)}` + : instructions; + if (systemPrompt) { + aiMessages.push({ role: "system", content: systemPrompt }); + } + + for (const m of messages) { + if (m.role === "tool") { + aiMessages.push({ + role: "user", + content: `Tool result for ${m.toolName}: ${m.content}`, + }); + } else { + aiMessages.push({ role: m.role, content: m.content }); + } + } + + const aiTools = + tools.length > 0 + ? tools.map((t) => ({ + type: "function" as const, + function: { + name: t.name, + description: t.description, + parameters: t.parameters, + }, + })) + : undefined; + + const result = (await env.AI.run( + model as keyof AiModels, + { + messages: aiMessages, + ...(aiTools && { tools: aiTools }), + stream: false, + } as any + )) as any; + + const choice = result?.choices?.[0]?.message; + const content: string = choice?.content ?? ""; + const toolCalls: LLMResponse["toolCalls"] = []; + if (choice?.tool_calls) { + for (const tc of choice.tool_calls) { + toolCalls.push({ + id: tc.id ?? `wai_${geminiCallId()}`, + name: tc.function.name, + arguments: safeJsonParse(tc.function.arguments), + }); + } + } + + const usage = result?.usage; + return { + content, + toolCalls, + inputTokens: usage?.prompt_tokens ?? 0, + outputTokens: usage?.completion_tokens ?? 0, + }; +} + +// ── Helpers ────────────────────────────────────────────────────────────────── + +/** + * Generates a synthetic id for providers that don't return tool-call ids. + * Uses crypto.randomUUID() — Date.now()/Math.random() are unavailable in some + * runtime sandboxes and need not be used here. + */ +function geminiCallId(): string { + return crypto.randomUUID().replace(/-/g, "").slice(0, 16); +} + +export function safeJsonParse(value: unknown): Record { + if (typeof value === "string") { + try { + return JSON.parse(value); + } catch { + return { raw: value }; + } + } + if (typeof value === "object" && value !== null) { + return value as Record; + } + return {}; +} diff --git a/apps/api/src/durable-objects/agent-runner.ts b/apps/api/src/durable-objects/agent-runner.ts index 785da708..e286a7d9 100644 --- a/apps/api/src/durable-objects/agent-runner.ts +++ b/apps/api/src/durable-objects/agent-runner.ts @@ -8,9 +8,7 @@ * Supports four LLM providers: anthropic, google, openai, workers-ai. */ -import Anthropic from "@anthropic-ai/sdk"; import type { ToolDefinition, ToolReference } from "@dafthunk/runtime"; -import { NodeToolProvider } from "@dafthunk/runtime"; import type { AgentProvider } from "@dafthunk/runtime/nodes/agent/base-agent-node"; import type { AgentLoopResult, @@ -19,32 +17,18 @@ import type { LLMResponse, } from "@dafthunk/runtime/utils/agent-loop"; import { runAgentLoop } from "@dafthunk/runtime/utils/agent-loop"; -import { - getAnthropicConfig, - getGoogleAIConfig, - getOpenAIConfig, -} from "@dafthunk/runtime/utils/ai-gateway"; -import { createCodeModeToolDefinition } from "@dafthunk/runtime/utils/code-mode"; -import { schemaToJsonSchema } from "@dafthunk/runtime/utils/schema-to-json-schema"; import type { TokenPricing } from "@dafthunk/runtime/utils/usage"; import { calculateTokenUsage } from "@dafthunk/runtime/utils/usage"; -import type { Schema } from "@dafthunk/types"; -import { GoogleGenAI } from "@google/genai"; import { Agent } from "agents"; -import OpenAI from "openai"; import type { Bindings } from "../context"; -import { CloudflareCredentialService } from "../runtime/cloudflare-credential-service"; -import { CloudflareDatabaseService } from "../runtime/cloudflare-database-service"; -import { CloudflareDatasetService } from "../runtime/cloudflare-dataset-service"; -import { CloudflareNodeRegistry } from "../runtime/cloudflare-node-registry"; +import { callAgentLLM } from "./agent-llm"; import { - buildPresignedUrlConfig, - CloudflareObjectStore, -} from "../runtime/cloudflare-object-store"; -import { CloudflareQueueService } from "../runtime/cloudflare-queue-service"; -import { createCodeModeExecutor } from "../runtime/code-mode-executor"; -import { createToolContext } from "../runtime/tool-context"; + applyCodeMode, + buildNodeToolProvider, + resolveTools, + toJsonSchema, +} from "./agent-services"; // ── Request / Response types ───────────────────────────────────────────── @@ -140,34 +124,6 @@ export class AgentRunner extends Agent { this.initialized = true; } - // ── Tool provider setup ────────────────────────────────────────────── - - private async buildNodeToolProvider( - organizationId: string - ): Promise { - const nodeRegistry = new CloudflareNodeRegistry(this.env, false); - const objectStore = new CloudflareObjectStore( - this.env.RESSOURCES, - buildPresignedUrlConfig(this.env) - ); - const credentialService = new CloudflareCredentialService(this.env); - await credentialService.initialize(organizationId); - const databaseService = new CloudflareDatabaseService(this.env); - const datasetService = new CloudflareDatasetService(this.env); - const queueService = new CloudflareQueueService(this.env); - - return new NodeToolProvider(nodeRegistry, (nodeId, inputs) => - createToolContext( - nodeId, - inputs, - this.env, - objectStore, - credentialService, - { databaseService, datasetService, queueService } - ) - ); - } - // ── Main handler ─────────────────────────────────────────────────────── private async handleRun(request: Request): Promise { @@ -196,16 +152,15 @@ export class AgentRunner extends Agent { runId ); - const nodeToolProvider = await this.buildNodeToolProvider( + const nodeToolProvider = await buildNodeToolProvider( + this.env, body.organizationId ); // Resolve tool definitions from references and apply code mode - const resolvedTools = await this.resolveTools( - body.tools, - nodeToolProvider - ); - const toolDefinitions = this.applyCodeMode( + const resolvedTools = await resolveTools(body.tools, nodeToolProvider); + const toolDefinitions = applyCodeMode( + this.env, resolvedTools, body.codeMode ?? false ); @@ -215,9 +170,6 @@ export class AgentRunner extends Agent { ? `Context:\n${body.context}\n\nRequest:\n${body.input}` : body.input; - // Build built-in Gemini tools (only effective for google provider) - const geminiBuiltInTools = this.buildGeminiBuiltInTools(body); - // Build resume state from persisted conversation (if stateful) const resumeState = this.buildResumeState( body.agentId, @@ -225,42 +177,14 @@ export class AgentRunner extends Agent { body.maxHistory ?? 50 ); - // Convert schema if provided - const jsonSchema = - body.schema && - typeof body.schema === "object" && - "fields" in body.schema - ? schemaToJsonSchema(body.schema as unknown as Schema) - : undefined; - - // Build callFinalLLM that applies schema constraint on the final turn - const callFinalLLM = jsonSchema - ? (messages: AgentMessage[], tools: ToolDefinition[]) => - this.callLLM( - body.provider, - body.model, - body.instructions, - messages, - tools, - geminiBuiltInTools, - jsonSchema - ) - : undefined; + const { callLLM, callFinalLLM } = this.buildLlmCallbacks(body); // Run the agent loop const result = await runAgentLoop({ userMessage, tools: toolDefinitions, maxSteps: body.maxSteps, - callLLM: (messages, tools) => - this.callLLM( - body.provider, - body.model, - body.instructions, - messages, - tools, - geminiBuiltInTools - ), + callLLM, callFinalLLM, onStepComplete: async (state) => { this.ctx.storage.sql.exec( @@ -272,6 +196,12 @@ export class AgentRunner extends Agent { resumeState, }); + // This runner configures no suspending tools, so the loop always + // completes; narrow the union for the persistence/response below. + if (result.status === "suspended") { + throw new Error("Agent loop suspended unexpectedly"); + } + // Persist conversation state for stateful sessions const agentMessages = this.persistConversationState( body.agentId, @@ -389,15 +319,14 @@ export class AgentRunner extends Agent { const { runId, executionInstanceId, nodeId, pricing } = body; try { - const nodeToolProvider = await this.buildNodeToolProvider( + const nodeToolProvider = await buildNodeToolProvider( + this.env, body.organizationId ); - const resolvedTools = await this.resolveTools( - body.tools, - nodeToolProvider - ); - const toolDefinitions = this.applyCodeMode( + const resolvedTools = await resolveTools(body.tools, nodeToolProvider); + const toolDefinitions = applyCodeMode( + this.env, resolvedTools, body.codeMode ?? false ); @@ -406,8 +335,6 @@ export class AgentRunner extends Agent { ? `Context:\n${body.context}\n\nRequest:\n${body.input}` : body.input; - const geminiBuiltInTools = this.buildGeminiBuiltInTools(body); - // Build resume state from persisted conversation (if stateful) const resumeState = this.buildResumeState( body.agentId, @@ -415,41 +342,13 @@ export class AgentRunner extends Agent { body.maxHistory ?? 50 ); - // Convert schema if provided - const jsonSchema = - body.schema && - typeof body.schema === "object" && - "fields" in body.schema - ? schemaToJsonSchema(body.schema as unknown as Schema) - : undefined; - - // Build callFinalLLM that applies schema constraint on the final turn - const callFinalLLM = jsonSchema - ? (messages: AgentMessage[], tools: ToolDefinition[]) => - this.callLLM( - body.provider, - body.model, - body.instructions, - messages, - tools, - geminiBuiltInTools, - jsonSchema - ) - : undefined; + const { callLLM, callFinalLLM } = this.buildLlmCallbacks(body); const result = await runAgentLoop({ userMessage, tools: toolDefinitions, maxSteps: body.maxSteps, - callLLM: (messages, tools) => - this.callLLM( - body.provider, - body.model, - body.instructions, - messages, - tools, - geminiBuiltInTools - ), + callLLM, callFinalLLM, onStepComplete: async (state) => { this.ctx.storage.sql.exec( @@ -461,6 +360,12 @@ export class AgentRunner extends Agent { resumeState, }); + // This runner configures no suspending tools, so the loop always + // completes; narrow the union for the persistence/response below. + if (result.status === "suspended") { + throw new Error("Agent loop suspended unexpectedly"); + } + // Persist conversation state for stateful sessions const agentMessages = this.persistConversationState( body.agentId, @@ -634,488 +539,44 @@ export class AgentRunner extends Agent { return tools; } - // ── Code Mode wrapping ───────────────────────────────────────────────── + // ── LLM dispatch ────────────────────────────────────────────────────── /** - * If code mode is enabled and tools are available, wraps all tool - * definitions into a single "codemode" tool. Falls back to the - * original tools when the executor binding is unavailable. + * Build the loop's `callLLM`/`callFinalLLM` callbacks for a request. The + * final-turn callback is only present when a structured-output schema is + * supplied; both dispatch through the shared `callAgentLLM`. */ - private applyCodeMode( - tools: ToolDefinition[], - codeMode: boolean - ): ToolDefinition[] { - if (!codeMode || tools.length === 0) return tools; - - const executor = createCodeModeExecutor(this.env); - if (!executor) { - console.warn( - "Code mode requested but LOADER binding is unavailable — falling back to standard tool calling" - ); - return tools; - } - - return [createCodeModeToolDefinition(tools, executor)]; - } - - // ── Tool resolution ──────────────────────────────────────────────────── - - private async resolveTools( - toolRefs: ToolReference[], - nodeToolProvider: NodeToolProvider - ): Promise { - if (!toolRefs || toolRefs.length === 0) return []; - - const definitions: ToolDefinition[] = []; - for (const ref of toolRefs) { - try { - const def = await nodeToolProvider.getToolDefinition( - ref.identifier, - ref.config - ); - definitions.push(def); - } catch (error) { - console.warn(`Failed to resolve tool ${ref.identifier}:`, error); - } - } - return definitions; - } - - // ── LLM dispatch ────────────────────────────────────────────────────── - - private async callLLM( - provider: AgentProvider, - model: string, - instructions: string, - messages: AgentMessage[], - tools: ToolDefinition[], - builtInTools?: Record[], - schema?: Record - ): Promise { - switch (provider) { - case "anthropic": - return this.callAnthropic(model, instructions, messages, tools, schema); - case "google": - return this.callGoogle( - model, - instructions, - messages, - tools, - builtInTools, - schema - ); - case "openai": - return this.callOpenAI(model, instructions, messages, tools, schema); - case "workers-ai": - return this.callWorkersAI(model, instructions, messages, tools, schema); - default: - throw new Error(`Unsupported provider: ${provider}`); - } - } - - // ── Anthropic ────────────────────────────────────────────────────────── - - private async callAnthropic( - model: string, - instructions: string, - messages: AgentMessage[], - tools: ToolDefinition[], - schema?: Record - ): Promise { - const client = new Anthropic({ - apiKey: "gateway-managed", - timeout: 120_000, - ...getAnthropicConfig(this.env), - }); - - // Convert generic messages to Anthropic format - const anthropicMessages: Anthropic.MessageParam[] = messages.map((m) => { - if (m.role === "assistant" && m.toolCalls?.length) { - return { - role: "assistant" as const, - content: [ - ...(m.content ? [{ type: "text" as const, text: m.content }] : []), - ...m.toolCalls.map((tc) => ({ - type: "tool_use" as const, - id: tc.id, - name: tc.name, - input: tc.arguments, - })), - ], - }; - } - if (m.role === "tool") { - return { - role: "user" as const, - content: [ - { - type: "tool_result" as const, - tool_use_id: m.toolCallId!, - content: m.content, - }, - ], - }; - } - return { - role: m.role === "user" ? ("user" as const) : ("assistant" as const), - content: m.content, - }; - }); - - // Convert tools to Anthropic format - const anthropicTools: Anthropic.Tool[] = tools.map((t) => ({ - name: t.name, - description: t.description, - input_schema: t.parameters as Anthropic.Tool.InputSchema, - })); - - // When schema is provided, append a JSON constraint to the system prompt - const systemPrompt = schema - ? `${instructions}\n\nYou MUST respond with valid JSON matching this schema:\n${JSON.stringify(schema)}` - : instructions; - - const response = await client.messages.create({ - model, - max_tokens: 4096, - messages: anthropicMessages, - ...(systemPrompt && { system: systemPrompt }), - ...(anthropicTools.length > 0 && { tools: anthropicTools }), - }); - - // Parse response - let content = ""; - const toolCalls: LLMResponse["toolCalls"] = []; - - for (const block of response.content) { - if (block.type === "text") { - content += block.text; - } else if (block.type === "tool_use") { - toolCalls.push({ - id: block.id, - name: block.name, - arguments: block.input as Record, - }); - } - } - - return { - content, - toolCalls, - inputTokens: response.usage.input_tokens, - outputTokens: response.usage.output_tokens, - }; - } - - // ── Google (Gemini) ──────────────────────────────────────────────────── - - private async callGoogle( - model: string, - instructions: string, - messages: AgentMessage[], - tools: ToolDefinition[], - builtInTools?: Record[], - schema?: Record - ): Promise { - const ai = new GoogleGenAI({ - apiKey: "gateway-managed", - ...getGoogleAIConfig(this.env), - }); - - // Build contents from message history - const contents: Array<{ - role: string; - parts: Array>; - }> = []; - for (const m of messages) { - if (m.role === "user") { - contents.push({ role: "user", parts: [{ text: m.content }] }); - } else if (m.role === "assistant") { - const parts: Array> = []; - if (m.content) parts.push({ text: m.content }); - if (m.toolCalls) { - for (const tc of m.toolCalls) { - parts.push({ - functionCall: { name: tc.name, args: tc.arguments }, - ...(tc.thoughtSignature && { - thoughtSignature: tc.thoughtSignature, - }), - }); - } - } - contents.push({ role: "model", parts }); - } else if (m.role === "tool") { - contents.push({ - role: "user", - parts: [ - { - functionResponse: { - name: m.toolName, - response: safeJsonParse(m.content), - }, - }, - ], - }); - } - } - - // Convert tools to Gemini format - const functionDeclarations = tools.map((t) => ({ - name: t.name, - description: t.description, - parameters: t.parameters, - })); - - const config: Record = {}; - const allTools: Record[] = [...(builtInTools ?? [])]; - if (functionDeclarations.length > 0) { - allTools.push({ functionDeclarations }); - } - if (allTools.length > 0) { - config.tools = allTools; - } - - // Apply schema constraint for structured JSON output - if (schema) { - config.responseMimeType = "application/json"; - config.responseSchema = schema; - } - - const response = await ai.models.generateContent({ - model, - contents: contents as any, - config: config as any, - ...(instructions && { systemInstruction: instructions }), - }); - - // Parse response - let content = ""; - const toolCalls: LLMResponse["toolCalls"] = []; - - if (response.candidates?.[0]?.content?.parts) { - for (const part of response.candidates[0].content.parts as any[]) { - if (part.text) { - content += part.text; - } - if (part.functionCall) { - toolCalls.push({ - id: `gemini_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`, - name: part.functionCall.name, - arguments: part.functionCall.args ?? {}, - ...(part.thoughtSignature && { - thoughtSignature: part.thoughtSignature, - }), - }); - } - } - } - - const usage = response.usageMetadata; - return { - content, - toolCalls, - inputTokens: usage?.promptTokenCount ?? 0, - outputTokens: usage?.candidatesTokenCount ?? 0, - }; - } - - // ── OpenAI ───────────────────────────────────────────────────────────── - - private async callOpenAI( - model: string, - instructions: string, - messages: AgentMessage[], - tools: ToolDefinition[], - schema?: Record - ): Promise { - const client = new OpenAI({ - apiKey: "gateway-managed", - timeout: 120_000, - ...getOpenAIConfig(this.env), - }); - - // Convert generic messages to OpenAI format - const openaiMessages: OpenAI.ChatCompletionMessageParam[] = []; - - if (instructions) { - openaiMessages.push({ role: "system", content: instructions }); - } - - for (const m of messages) { - if (m.role === "user") { - openaiMessages.push({ role: "user", content: m.content }); - } else if (m.role === "assistant") { - openaiMessages.push({ - role: "assistant", - content: m.content || null, - ...(m.toolCalls?.length && { - tool_calls: m.toolCalls.map((tc) => ({ - id: tc.id, - type: "function" as const, - function: { - name: tc.name, - arguments: JSON.stringify(tc.arguments), - }, - })), - }), - }); - } else if (m.role === "tool") { - openaiMessages.push({ - role: "tool", - tool_call_id: m.toolCallId!, - content: m.content, - }); - } - } - - // Convert tools to OpenAI format - const openaiTools: OpenAI.ChatCompletionTool[] = tools.map((t) => ({ - type: "function" as const, - function: { - name: t.name, - description: t.description, - parameters: t.parameters, - }, - })); - - // Build response_format when a schema is provided - const responseFormat = schema - ? { - type: "json_schema" as const, - json_schema: { - name: "response", - schema, - strict: true, - }, - } - : undefined; - - const completion = await client.chat.completions.create({ - model, - max_tokens: 4096, - messages: openaiMessages, - ...(openaiTools.length > 0 && { tools: openaiTools }), - ...(responseFormat && { response_format: responseFormat }), - }); - - const choice = completion.choices[0]; - const content = choice?.message?.content ?? ""; - const toolCalls: LLMResponse["toolCalls"] = []; - - if (choice?.message?.tool_calls) { - for (const tc of choice.message.tool_calls) { - if (tc.type === "function") { - toolCalls.push({ - id: tc.id, - name: tc.function.name, - arguments: safeJsonParse(tc.function.arguments), - }); - } - } - } - - return { - content, - toolCalls, - inputTokens: completion.usage?.prompt_tokens ?? 0, - outputTokens: completion.usage?.completion_tokens ?? 0, - }; - } - - // ── Workers AI ───────────────────────────────────────────────────────── - - private async callWorkersAI( - model: string, - _instructions: string, - messages: AgentMessage[], - tools: ToolDefinition[], - schema?: Record - ): Promise { - // Workers AI uses OpenAI-compatible chat format - const aiMessages: Array<{ role: string; content: string }> = []; - - // When schema is provided, append a JSON constraint to the system prompt - // (Workers AI models don't reliably support response_format) - const systemPrompt = schema - ? `${_instructions}\n\nYou MUST respond with valid JSON matching this schema:\n${JSON.stringify(schema)}` - : _instructions; - - if (systemPrompt) { - aiMessages.push({ role: "system", content: systemPrompt }); - } - - for (const m of messages) { - if (m.role === "tool") { - // Workers AI doesn't have a native tool role — inject as user message - aiMessages.push({ - role: "user", - content: `Tool result for ${m.toolName}: ${m.content}`, - }); - } else { - aiMessages.push({ role: m.role, content: m.content }); - } - } - - // Workers AI tool format (OpenAI-compatible) - const aiTools = - tools.length > 0 - ? tools.map((t) => ({ - type: "function" as const, - function: { - name: t.name, - description: t.description, - parameters: t.parameters, - }, - })) - : undefined; - - const result = (await this.env.AI.run( - model as keyof AiModels, - { - messages: aiMessages, - ...(aiTools && { tools: aiTools }), - stream: false, - } as any - )) as any; - - // Workers AI returns OpenAI chat-completions format - const choice = result?.choices?.[0]?.message; - const content: string = choice?.content ?? ""; - const toolCalls: LLMResponse["toolCalls"] = []; - - if (choice?.tool_calls) { - for (const tc of choice.tool_calls) { - toolCalls.push({ - id: - tc.id ?? - `wai_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`, - name: tc.function.name, - arguments: safeJsonParse(tc.function.arguments), - }); - } - } - - const usage = result?.usage; + private buildLlmCallbacks(body: AgentRunRequest): { + callLLM: ( + messages: AgentMessage[], + tools: ToolDefinition[] + ) => Promise; + callFinalLLM?: ( + messages: AgentMessage[], + tools: ToolDefinition[] + ) => Promise; + } { + const builtInTools = this.buildGeminiBuiltInTools(body); + const jsonSchema = toJsonSchema(body.schema); + const llm = ( + messages: AgentMessage[], + tools: ToolDefinition[], + schema?: Record + ) => + callAgentLLM(this.env, { + provider: body.provider, + model: body.model, + instructions: body.instructions, + messages, + tools, + builtInTools, + schema, + }); return { - content, - toolCalls, - inputTokens: usage?.prompt_tokens ?? 0, - outputTokens: usage?.completion_tokens ?? 0, + callLLM: (messages, tools) => llm(messages, tools), + callFinalLLM: jsonSchema + ? (messages, tools) => llm(messages, tools, jsonSchema) + : undefined, }; } } - -// ── Helpers ────────────────────────────────────────────────────────────── - -function safeJsonParse(value: unknown): Record { - if (typeof value === "string") { - try { - return JSON.parse(value); - } catch { - return { raw: value }; - } - } - if (typeof value === "object" && value !== null) { - return value as Record; - } - return {}; -} diff --git a/apps/api/src/durable-objects/agent-services.ts b/apps/api/src/durable-objects/agent-services.ts new file mode 100644 index 00000000..72325f35 --- /dev/null +++ b/apps/api/src/durable-objects/agent-services.ts @@ -0,0 +1,106 @@ +/** + * Shared service wiring for the agent Durable Objects. + * + * Builds the node-backed tool provider and resolves/wraps tool definitions — + * the parts AgentRunner and EmailAgentRunner have in common. + */ + +import type { ToolDefinition, ToolReference } from "@dafthunk/runtime"; +import { NodeToolProvider } from "@dafthunk/runtime"; +import { createCodeModeToolDefinition } from "@dafthunk/runtime/utils/code-mode"; +import { schemaToJsonSchema } from "@dafthunk/runtime/utils/schema-to-json-schema"; +import type { Schema } from "@dafthunk/types"; + +import type { Bindings } from "../context"; +import { CloudflareCredentialService } from "../runtime/cloudflare-credential-service"; +import { CloudflareDatabaseService } from "../runtime/cloudflare-database-service"; +import { CloudflareDatasetService } from "../runtime/cloudflare-dataset-service"; +import { CloudflareNodeRegistry } from "../runtime/cloudflare-node-registry"; +import { + buildPresignedUrlConfig, + CloudflareObjectStore, +} from "../runtime/cloudflare-object-store"; +import { CloudflareQueueService } from "../runtime/cloudflare-queue-service"; +import { createCodeModeExecutor } from "../runtime/code-mode-executor"; +import { createToolContext } from "../runtime/tool-context"; + +/** Build a NodeToolProvider scoped to an organization's credentials/services. */ +export async function buildNodeToolProvider( + env: Bindings, + organizationId: string +): Promise { + const nodeRegistry = new CloudflareNodeRegistry(env, false); + const objectStore = new CloudflareObjectStore( + env.RESSOURCES, + buildPresignedUrlConfig(env) + ); + const credentialService = new CloudflareCredentialService(env); + await credentialService.initialize(organizationId); + const databaseService = new CloudflareDatabaseService(env); + const datasetService = new CloudflareDatasetService(env); + const queueService = new CloudflareQueueService(env); + + return new NodeToolProvider(nodeRegistry, (nodeId, inputs) => + createToolContext(nodeId, inputs, env, objectStore, credentialService, { + databaseService, + datasetService, + queueService, + }) + ); +} + +/** Resolve tool references into executable tool definitions. */ +export async function resolveTools( + toolRefs: ToolReference[], + provider: NodeToolProvider +): Promise { + if (!toolRefs || toolRefs.length === 0) return []; + + const definitions: ToolDefinition[] = []; + for (const ref of toolRefs) { + try { + const def = await provider.getToolDefinition(ref.identifier, ref.config); + definitions.push(def); + } catch (error) { + console.warn(`Failed to resolve tool ${ref.identifier}:`, error); + } + } + return definitions; +} + +/** + * When code mode is enabled, collapse all tools into a single "codemode" tool. + * Falls back to the original tools when the executor binding is unavailable. + */ +export function applyCodeMode( + env: Bindings, + tools: ToolDefinition[], + codeMode: boolean +): ToolDefinition[] { + if (!codeMode || tools.length === 0) return tools; + + const executor = createCodeModeExecutor(env); + if (!executor) { + console.warn( + "Code mode requested but LOADER binding is unavailable — falling back to standard tool calling" + ); + return tools; + } + + return [createCodeModeToolDefinition(tools, executor)]; +} + +/** + * Convert a node `Schema` into a JSON schema for structured output. Inputs that + * are already JSON schemas (no `fields` property) pass through unchanged; + * `undefined`/non-objects yield `undefined`. + */ +export function toJsonSchema( + schema: Record | undefined +): Record | undefined { + if (!schema || typeof schema !== "object") return undefined; + if ("fields" in schema) { + return schemaToJsonSchema(schema as unknown as Schema); + } + return schema; +} diff --git a/apps/api/src/durable-objects/email-agent-barrier.test.ts b/apps/api/src/durable-objects/email-agent-barrier.test.ts new file mode 100644 index 00000000..caa3726c --- /dev/null +++ b/apps/api/src/durable-objects/email-agent-barrier.test.ts @@ -0,0 +1,88 @@ +import { describe, expect, it } from "vitest"; + +import { + allSettled, + nextWaitingDeadline, + type PendingAsk, + settleExpired, + settleReply, + toResumeResults, +} from "./email-agent-barrier"; + +const SENTINEL = "(no reply received)"; + +function ask(overrides: Partial): PendingAsk { + return { + toolCallId: "tc", + interlocutorId: "alice", + threadId: "t1", + deadline: 1000, + status: "waiting", + ...overrides, + }; +} + +describe("email-agent-barrier", () => { + describe("settleReply", () => { + it("settles the waiting ask on a matching thread", () => { + const pending = [ + ask({ toolCallId: "a", threadId: "t1" }), + ask({ toolCallId: "b", threadId: "t2" }), + ]; + expect(settleReply(pending, "t1", "hello")).toBe(true); + expect(pending[0]).toMatchObject({ status: "settled", result: "hello" }); + expect(pending[1].status).toBe("waiting"); + }); + + it("returns false when no waiting ask owns the thread", () => { + const pending = [ask({ threadId: "t1", status: "settled" })]; + expect(settleReply(pending, "t1", "x")).toBe(false); + expect(settleReply(pending, "tX", "x")).toBe(false); + }); + }); + + describe("settleExpired", () => { + it("settles only waiting asks past their deadline with the sentinel", () => { + const pending = [ + ask({ toolCallId: "a", deadline: 500 }), + ask({ toolCallId: "b", deadline: 5000 }), + ask({ toolCallId: "c", deadline: 100, status: "settled", result: "r" }), + ]; + const expired = settleExpired(pending, 1000, SENTINEL); + expect(expired.map((p) => p.toolCallId)).toEqual(["a"]); + expect(pending[0]).toMatchObject({ status: "settled", result: SENTINEL }); + expect(pending[1].status).toBe("waiting"); + expect(pending[2].result).toBe("r"); // untouched + }); + }); + + describe("allSettled / nextWaitingDeadline", () => { + it("reports completion and the earliest remaining deadline", () => { + const pending = [ + ask({ deadline: 3000, status: "settled" }), + ask({ deadline: 2000 }), + ask({ deadline: 4000 }), + ]; + expect(allSettled(pending)).toBe(false); + expect(nextWaitingDeadline(pending)).toBe(2000); + + pending[1].status = "settled"; + pending[2].status = "settled"; + expect(allSettled(pending)).toBe(true); + expect(nextWaitingDeadline(pending)).toBeUndefined(); + }); + }); + + describe("toResumeResults", () => { + it("maps every ask in order, using the sentinel when unanswered", () => { + const pending = [ + ask({ toolCallId: "a", status: "settled", result: "reply-a" }), + ask({ toolCallId: "b", status: "settled" }), // no result + ]; + expect(toResumeResults(pending, "ask_interlocutor", SENTINEL)).toEqual([ + { toolCallId: "a", toolName: "ask_interlocutor", content: "reply-a" }, + { toolCallId: "b", toolName: "ask_interlocutor", content: SENTINEL }, + ]); + }); + }); +}); diff --git a/apps/api/src/durable-objects/email-agent-barrier.ts b/apps/api/src/durable-objects/email-agent-barrier.ts new file mode 100644 index 00000000..af4146be --- /dev/null +++ b/apps/api/src/durable-objects/email-agent-barrier.ts @@ -0,0 +1,86 @@ +/** + * Pure barrier bookkeeping for EmailAgentRunner. + * + * A fan-out of `ask_interlocutor` calls parks the agent loop until every ask is + * settled — answered by a reply or expired by its deadline. These helpers own + * that settlement math so it can be reasoned about and tested without the + * Durable Object's bindings. + */ + +import type { ResolvedToolResult } from "@dafthunk/runtime/utils/agent-loop"; + +export interface PendingAsk { + toolCallId: string; + interlocutorId: string; + threadId: string; + deadline: number; + status: "waiting" | "settled"; + result?: string; +} + +/** + * Settle the first waiting ask on a thread with a reply. Returns true when an + * ask matched (mutates it in place); false when no waiting ask owns the thread. + */ +export function settleReply( + pending: PendingAsk[], + threadId: string, + text: string +): boolean { + const ask = pending.find( + (p) => p.threadId === threadId && p.status === "waiting" + ); + if (!ask) return false; + ask.status = "settled"; + ask.result = text; + return true; +} + +/** + * Settle every waiting ask whose deadline has passed with a sentinel result. + * Returns the asks that were just expired (mutated in place). + */ +export function settleExpired( + pending: PendingAsk[], + now: number, + sentinel: string +): PendingAsk[] { + const expired: PendingAsk[] = []; + for (const p of pending) { + if (p.status === "waiting" && p.deadline <= now) { + p.status = "settled"; + p.result = sentinel; + expired.push(p); + } + } + return expired; +} + +/** True when no ask is still waiting (the barrier is complete). */ +export function allSettled(pending: PendingAsk[]): boolean { + return pending.every((p) => p.status === "settled"); +} + +/** Earliest deadline among still-waiting asks, or undefined if none remain. */ +export function nextWaitingDeadline(pending: PendingAsk[]): number | undefined { + const deadlines = pending + .filter((p) => p.status === "waiting") + .map((p) => p.deadline); + return deadlines.length > 0 ? Math.min(...deadlines) : undefined; +} + +/** + * Build the tool results that resume the suspended loop — one per ask, in the + * original order, falling back to the sentinel for any without a recorded result. + */ +export function toResumeResults( + pending: PendingAsk[], + toolName: string, + sentinel: string +): ResolvedToolResult[] { + return pending.map((p) => ({ + toolCallId: p.toolCallId, + toolName, + content: p.result ?? sentinel, + })); +} diff --git a/apps/api/src/durable-objects/email-agent-flow.test.ts b/apps/api/src/durable-objects/email-agent-flow.test.ts new file mode 100644 index 00000000..fcc7484c --- /dev/null +++ b/apps/api/src/durable-objects/email-agent-flow.test.ts @@ -0,0 +1,384 @@ +/** + * Integration test for the email-agent conversation flow. + * + * Drives the *real* agent loop ({@link runAgentLoop}) and the *real* barrier + * helpers through a small harness that mirrors EmailAgentRunner's orchestration, + * stubbing only the I/O (the LLM and email sending). This exercises the path + * that matters: a parallel fan-out of questions parks the loop; replies and + * deadline timeouts settle the barrier; once complete the loop resumes with the + * collected results and runs to completion. + */ + +import type { ToolDefinition } from "@dafthunk/runtime"; +import type { + AgentLoopState, + AgentMessage, + AgentToolCall, + LLMResponse, +} from "@dafthunk/runtime/utils/agent-loop"; +import { runAgentLoop } from "@dafthunk/runtime/utils/agent-loop"; +import { describe, expect, it } from "vitest"; + +import { + allSettled, + type PendingAsk, + settleExpired, + settleReply, + toResumeResults, +} from "./email-agent-barrier"; + +const ASK = "ask_interlocutor"; +const SENTINEL = "(no reply received before the deadline)"; +const TIMEOUT = 1000; + +interface Interlocutor { + id: string; + email: string; +} + +interface Exchange { + interlocutorId: string; + threadId: string; + message: string; + reply?: string; + timedOut?: boolean; +} + +interface SentEmail { + to: string; + text: string; + threadId: string; +} + +/** Harness mirroring EmailAgentRunner's drive loop with stubbed I/O. */ +class EmailAgentHarness { + state?: AgentLoopState; + pending: PendingAsk[] = []; + exchanges: Exchange[] = []; + outbox: SentEmail[] = []; + completion?: { + result: string; + finishReason: string; + rounds: number; + transcript: Exchange[]; + }; + private threadSeq = 0; + + constructor( + private readonly cfg: { + objective: string; + interlocutors: Interlocutor[]; + maxRounds: number; + callLLM: ( + messages: AgentMessage[], + tools: ToolDefinition[] + ) => Promise; + } + ) {} + + private askTool(): ToolDefinition { + return { + name: ASK, + description: "Email an interlocutor and wait for their reply.", + parameters: { type: "object", properties: {} }, + function: async () => SENTINEL, // never invoked (suspending) + }; + } + + async start(): Promise { + await this.runLoop(); + } + + private async runLoop(resume?: ReturnType) { + const outcome = await runAgentLoop({ + userMessage: this.cfg.objective, + tools: [this.askTool()], + maxSteps: this.cfg.maxRounds, + callLLM: this.cfg.callLLM, + isSuspendingTool: (n) => n === ASK, + ...(this.state ? { resumeState: this.state } : {}), + ...(resume ? { resumeToolResults: resume } : {}), + }); + + if (outcome.status === "suspended") { + this.state = outcome.state; + this.parkOnAsks(outcome.pendingToolCalls); + return; + } + + this.completion = { + result: outcome.text, + finishReason: + outcome.finishReason === "max_steps_reached" + ? "max_rounds" + : outcome.finishReason === "error" + ? "error" + : "goal_reached", + rounds: outcome.totalSteps, + transcript: this.exchanges, + }; + } + + private parkOnAsks(asks: AgentToolCall[]): void { + for (const a of asks) { + const who = this.cfg.interlocutors.find( + (i) => i.id === String(a.arguments.interlocutor) + ); + if (!who) { + this.pending.push({ + toolCallId: a.id, + interlocutorId: String(a.arguments.interlocutor), + threadId: "", + deadline: TIMEOUT, + status: "settled", + result: "(unknown interlocutor)", + }); + continue; + } + const threadId = `thread-${++this.threadSeq}`; + const message = String(a.arguments.message); + this.outbox.push({ to: who.email, text: message, threadId }); + this.exchanges.push({ interlocutorId: who.id, threadId, message }); + this.pending.push({ + toolCallId: a.id, + interlocutorId: who.id, + threadId, + deadline: TIMEOUT, + status: "waiting", + }); + } + } + + /** Simulate an inbound reply on a thread (mirrors DO.deliverReply). */ + async deliverReply(threadId: string, text: string): Promise { + if (!settleReply(this.pending, threadId, text)) return false; + this.recordReply(threadId, text, false); + if (allSettled(this.pending)) await this.resume(); + return true; + } + + /** Simulate the per-reply deadline firing (mirrors DO.alarm). */ + async fireTimeout(now: number): Promise { + const expired = settleExpired(this.pending, now, SENTINEL); + for (const p of expired) this.recordReply(p.threadId, SENTINEL, true); + if (allSettled(this.pending)) await this.resume(); + } + + private async resume(): Promise { + const resume = toResumeResults(this.pending, ASK, SENTINEL); + this.pending = []; + await this.runLoop(resume); + } + + private recordReply( + threadId: string, + reply: string, + timedOut: boolean + ): void { + for (let i = this.exchanges.length - 1; i >= 0; i--) { + const e = this.exchanges[i]; + if (e.threadId === threadId && e.reply === undefined) { + e.reply = reply; + if (timedOut) e.timedOut = true; + return; + } + } + } +} + +// ── Scripted LLM helpers ───────────────────────────────────────────────────── + +function call( + id: string, + interlocutor: string, + message: string +): AgentToolCall { + return { id, name: ASK, arguments: { interlocutor, message } }; +} + +function scripted(responses: LLMResponse[]) { + let i = 0; + const seen: AgentMessage[][] = []; + const fn = async (messages: AgentMessage[]): Promise => { + seen.push(structuredClone(messages)); + const r = responses[i]; + if (!r) throw new Error(`LLM out of script at call ${i}`); + i += 1; + return r; + }; + return { fn, seen }; +} + +const INTERLOCUTORS: Interlocutor[] = [ + { id: "alice", email: "alice@example.com" }, + { id: "bob", email: "bob@example.com" }, +]; + +describe("email agent flow (loop + barrier integration)", () => { + it("fans out to two interlocutors; one replies, one times out, then completes", async () => { + const llm = scripted([ + // Turn 1: ask both in parallel. + { + content: "", + toolCalls: [ + call("c-alice", "alice", "Are you available Tuesday?"), + call("c-bob", "bob", "Are you available Tuesday?"), + ], + inputTokens: 10, + outputTokens: 5, + }, + // Turn 2 (after barrier): wrap up. + { + content: "Scheduled with Alice for Tuesday.", + toolCalls: [], + inputTokens: 8, + outputTokens: 4, + }, + ]); + + const agent = new EmailAgentHarness({ + objective: "Find a meeting time", + interlocutors: INTERLOCUTORS, + maxRounds: 10, + callLLM: llm.fn, + }); + + await agent.start(); + + // Parked on the fan-out: two emails sent, two asks waiting, no completion. + expect(agent.outbox.map((e) => e.to)).toEqual([ + "alice@example.com", + "bob@example.com", + ]); + expect(agent.pending).toHaveLength(2); + expect(agent.completion).toBeUndefined(); + + // Alice replies — barrier not yet complete (Bob still outstanding). + const aliceThread = agent.outbox[0].threadId; + expect(await agent.deliverReply(aliceThread, "Yes, Tuesday works")).toBe( + true + ); + expect(agent.completion).toBeUndefined(); + + // Bob's deadline passes — barrier completes and the loop resumes. + await agent.fireTimeout(TIMEOUT); + + expect(agent.completion).toBeDefined(); + expect(agent.completion?.finishReason).toBe("goal_reached"); + expect(agent.completion?.result).toBe("Scheduled with Alice for Tuesday."); + expect(agent.completion?.rounds).toBe(1); + + // Transcript captures both exchanges, including the timeout sentinel. + const transcript = agent.completion!.transcript; + expect(transcript).toHaveLength(2); + expect(transcript[0]).toMatchObject({ + interlocutorId: "alice", + reply: "Yes, Tuesday works", + }); + expect(transcript[0].timedOut).toBeUndefined(); + expect(transcript[1]).toMatchObject({ + interlocutorId: "bob", + reply: SENTINEL, + timedOut: true, + }); + + // The resumed turn saw both tool results (reply + sentinel). + const resumeMessages = llm.seen[1]; + const toolContents = resumeMessages + .filter((m) => m.role === "tool") + .map((m) => m.content); + expect(toolContents).toContain("Yes, Tuesday works"); + expect(toolContents).toContain(SENTINEL); + }); + + it("resumes as soon as the last reply arrives (no timeout needed)", async () => { + const llm = scripted([ + { + content: "", + toolCalls: [ + call("c-alice", "alice", "ping"), + call("c-bob", "bob", "ping"), + ], + inputTokens: 1, + outputTokens: 1, + }, + { content: "Done.", toolCalls: [], inputTokens: 1, outputTokens: 1 }, + ]); + const agent = new EmailAgentHarness({ + objective: "Collect two answers", + interlocutors: INTERLOCUTORS, + maxRounds: 10, + callLLM: llm.fn, + }); + + await agent.start(); + await agent.deliverReply(agent.outbox[0].threadId, "a"); + expect(agent.completion).toBeUndefined(); // still waiting on Bob + await agent.deliverReply(agent.outbox[1].threadId, "b"); + + expect(agent.completion?.finishReason).toBe("goal_reached"); + expect(agent.completion?.result).toBe("Done."); + }); + + it("ignores a reply on an unknown thread", async () => { + const llm = scripted([ + { + content: "", + toolCalls: [call("c-alice", "alice", "ping")], + inputTokens: 1, + outputTokens: 1, + }, + { content: "ok", toolCalls: [], inputTokens: 1, outputTokens: 1 }, + ]); + const agent = new EmailAgentHarness({ + objective: "x", + interlocutors: INTERLOCUTORS, + maxRounds: 10, + callLLM: llm.fn, + }); + + await agent.start(); + expect(await agent.deliverReply("does-not-exist", "hi")).toBe(false); + expect(agent.completion).toBeUndefined(); + + // The real reply still resumes and completes. + expect(await agent.deliverReply(agent.outbox[0].threadId, "hi")).toBe(true); + expect(agent.completion?.result).toBe("ok"); + }); + + it("spans multiple rounds: follow up after the first answers", async () => { + const llm = scripted([ + // Round 1: ask alice. + { + content: "", + toolCalls: [call("c1", "alice", "first?")], + inputTokens: 1, + outputTokens: 1, + }, + // Round 2: follow up with bob after alice answers. + { + content: "", + toolCalls: [call("c2", "bob", "second?")], + inputTokens: 1, + outputTokens: 1, + }, + // Round 3: finish. + { content: "All set.", toolCalls: [], inputTokens: 1, outputTokens: 1 }, + ]); + const agent = new EmailAgentHarness({ + objective: "two-step", + interlocutors: INTERLOCUTORS, + maxRounds: 10, + callLLM: llm.fn, + }); + + await agent.start(); + await agent.deliverReply(agent.outbox[0].threadId, "alice says hi"); + expect(agent.completion).toBeUndefined(); // now parked on bob + expect(agent.outbox).toHaveLength(2); + + await agent.deliverReply(agent.outbox[1].threadId, "bob says hi"); + expect(agent.completion?.result).toBe("All set."); + expect(agent.completion?.rounds).toBe(2); + }); +}); diff --git a/apps/api/src/durable-objects/email-agent-runner.ts b/apps/api/src/durable-objects/email-agent-runner.ts new file mode 100644 index 00000000..41c427fe --- /dev/null +++ b/apps/api/src/durable-objects/email-agent-runner.ts @@ -0,0 +1,613 @@ +/** + * EmailAgentRunner Durable Object + * + * Drives an agent that pursues a goal by emailing one or more interlocutors and + * waiting — for days if needed — for their replies. The workflow node parks once + * on `email-agent-complete-${nodeId}` with a long timeout; this DO owns the + * multi-turn conversation in between. + * + * The agent's `ask_interlocutor` tool is a *suspending* tool (see + * {@link runAgentLoop}). When the model calls it — possibly several times in one + * turn, to question multiple interlocutors in parallel — the loop parks and this + * DO sends the emails, registers each thread for reply routing, and sets an + * alarm per reply deadline. Replies arrive via {@link deliverReply}; once every + * outstanding ask in the turn is settled (replied or timed out) the loop + * resumes. When the agent stops calling tools (goal reached) or hits the round + * limit, the DO sends the completion event and the workflow continues. + */ + +import { DurableObject } from "cloudflare:workers"; +import type { ToolDefinition, ToolReference } from "@dafthunk/runtime"; +import type { AgentProvider } from "@dafthunk/runtime/nodes/agent/base-agent-node"; +import type { + AgentLoopState, + AgentMessage, + FinishReason, + ResolvedToolResult, +} from "@dafthunk/runtime/utils/agent-loop"; +import { runAgentLoop } from "@dafthunk/runtime/utils/agent-loop"; +import type { TokenPricing } from "@dafthunk/runtime/utils/usage"; +import { calculateTokenUsage } from "@dafthunk/runtime/utils/usage"; + +import type { Bindings } from "../context"; +import { createDatabase, getEmailByHandle } from "../db"; +import { CloudflareMailboxService } from "../runtime/cloudflare-mailbox-service"; +import { callAgentLLM } from "./agent-llm"; +import { + applyCodeMode, + buildNodeToolProvider, + resolveTools, + toJsonSchema, +} from "./agent-services"; +import { + allSettled, + nextWaitingDeadline, + type PendingAsk, + settleExpired, + settleReply, + toResumeResults, +} from "./email-agent-barrier"; +import type { MailboxMessageRow } from "./mailbox-do"; + +const ASK_TOOL = "ask_interlocutor"; +const NO_REPLY_SENTINEL = "(no reply received before the deadline)"; + +// ── Request type ───────────────────────────────────────────────────────────── + +export interface EmailInterlocutor { + /** Stable id the agent uses to address this interlocutor. */ + id: string; + email: string; + name?: string; + role?: string; +} + +export interface EmailAgentRunRequest { + /** Unique run id — DO instance name, `${executionId}:${nodeId}`. */ + runId: string; + executionInstanceId: string; + nodeId: string; + provider: AgentProvider; + model: string; + pricing?: TokenPricing; + organizationId: string; + /** Org email handle (local part) to send from; resolved to an email id. */ + fromHandle: string; + interlocutors: EmailInterlocutor[]; + /** The goal the agent should pursue. */ + objective: string; + /** Persona / behavioural system prompt. */ + instructions?: string; + /** Initial material/context for the agent. */ + context?: string; + /** Default subject for newly opened threads. */ + subject?: string; + /** Max conversation rounds (fan-out turns) before forced wrap-up. */ + maxRounds: number; + /** How long to wait for each reply before filling a timeout sentinel. */ + replyTimeoutMs: number; + /** Extra synchronous tools the agent may call between emails. */ + tools?: ToolReference[]; + /** Schema constraining the final result (structured output). */ + schema?: Record; +} + +// ── Internal persisted shapes ──────────────────────────────────────────────── + +interface ThreadRef { + threadId: string; + subject: string; +} + +/** One interlocutor's thread, sourced from the mailbox at completion. */ +interface TranscriptThread { + interlocutorId: string; + threadId: string; + /** True when the interlocutor never replied before the deadline. */ + timedOut: boolean; + messages: MailboxMessageRow[]; +} + +type RunStatus = "running" | "waiting" | "completed" | "error"; + +// ── Durable Object ─────────────────────────────────────────────────────────── + +export class EmailAgentRunner extends DurableObject { + private get storage(): DurableObjectStorage { + return this.ctx.storage; + } + + // ── Public RPC ───────────────────────────────────────────────────────── + + /** + * HTTP entry used by the workflow node (which lives in a package that can't + * import this DO's types, so it calls over `stub.fetch` rather than RPC). + */ + async fetch(request: Request): Promise { + const url = new URL(request.url); + if (url.pathname.endsWith("/start") && request.method === "POST") { + const body = (await request.json()) as EmailAgentRunRequest; + return Response.json(await this.start(body)); + } + return new Response("Not found", { status: 404 }); + } + + /** + * Begin a run. Idempotent: if the run already completed, re-sends the + * completion event (in case the prior send was lost) and returns. + */ + async start(request: EmailAgentRunRequest): Promise<{ status: string }> { + const status = await this.storage.get("status"); + + if (status === "completed") { + const cached = await this.storage.get("completion"); + if (cached) { + await this.sendCompletionEvent(request, cached); + } + return { status: "completed" }; + } + if (status === "running" || status === "waiting") { + return { status }; + } + + // Resolve the sending address up-front so a misconfiguration fails fast. + const db = createDatabase(this.env.DB); + const email = await getEmailByHandle(db, request.fromHandle.toLowerCase()); + if (!email || email.organizationId !== request.organizationId) { + await this.fail( + request, + `Sending address '${request.fromHandle}' not found for this organization` + ); + return { status: "error" }; + } + + await this.storage.put("request", request); + await this.storage.put("emailId", email.id); + await this.storage.put("status", "running" satisfies RunStatus); + + // Run the first turn in the background so the RPC returns promptly. + this.ctx.waitUntil(this.runLoop()); + return { status: "started" }; + } + + /** + * Deliver an inbound reply for one of the threads this run is waiting on. + * Called by the inbound-mail handler. Returns whether the reply matched a + * pending ask (false → fall through to normal handling). + */ + async deliverReply(args: { + threadId: string; + text: string; + }): Promise<{ accepted: boolean }> { + const pending = (await this.storage.get("pending")) ?? []; + if (!settleReply(pending, args.threadId, args.text)) { + return { accepted: false }; + } + + await this.storage.put("pending", pending); + + if (allSettled(pending)) { + await this.storage.deleteAlarm(); + this.ctx.waitUntil(this.resumeFromPending(pending)); + } + return { accepted: true }; + } + + /** Per-reply deadline: settle expired asks with a sentinel, then maybe resume. */ + async alarm(): Promise { + const pending = (await this.storage.get("pending")) ?? []; + if (pending.length === 0) return; + + const expired = settleExpired(pending, Date.now(), NO_REPLY_SENTINEL); + if (expired.length > 0) { + await this.storage.put("pending", pending); + await this.markTimedOut(expired.map((p) => p.threadId)); + } + + if (allSettled(pending)) { + this.ctx.waitUntil(this.resumeFromPending(pending)); + } else { + const next = nextWaitingDeadline(pending); + if (next !== undefined) await this.storage.setAlarm(next); + } + } + + // ── Loop driver ────────────────────────────────────────────────────────── + + /** Run the agent loop forward until it suspends on asks or completes. */ + private async runLoop(resume?: ResolvedToolResult[]): Promise { + const request = await this.storage.get("request"); + if (!request) return; + + try { + const state = await this.storage.get("state"); + const tools = await this.buildTools(request); + const jsonSchema = toJsonSchema(request.schema); + const instructions = this.systemPrompt(request); + + const llm = ( + messages: AgentMessage[], + llmTools: ToolDefinition[], + schema?: Record + ) => + callAgentLLM(this.env, { + provider: request.provider, + model: request.model, + instructions, + messages, + tools: llmTools, + schema, + }); + + const callLLM = (messages: AgentMessage[], llmTools: ToolDefinition[]) => + llm(messages, llmTools); + + const callFinalLLM = jsonSchema + ? (messages: AgentMessage[], llmTools: ToolDefinition[]) => + llm(messages, llmTools, jsonSchema) + : undefined; + + const outcome = await runAgentLoop({ + userMessage: this.userMessage(request), + tools, + maxSteps: Math.max(1, request.maxRounds), + callLLM, + callFinalLLM, + isSuspendingTool: (name) => name === ASK_TOOL, + ...(state ? { resumeState: state } : {}), + ...(resume ? { resumeToolResults: resume } : {}), + }); + + if (outcome.status === "suspended") { + await this.storage.put("state", outcome.state); + await this.parkOnAsks(request, outcome.pendingToolCalls); + return; + } + + // Completed (goal reached or round limit). + const finishReason = FINISH_REASON[outcome.finishReason]; + + const usage = request.pricing + ? calculateTokenUsage( + outcome.totalInputTokens, + outcome.totalOutputTokens, + request.pricing + ) + : 1; + + const completion: EmailCompletion = { + result: outcome.text, + transcript: await this.buildTranscript(request), + rounds: outcome.totalSteps, + finishReason, + totalInputTokens: outcome.totalInputTokens, + totalOutputTokens: outcome.totalOutputTokens, + usage, + ...(finishReason === "error" ? { error: outcome.text } : {}), + }; + + await this.complete(request, completion); + } catch (error) { + console.error("EmailAgentRunner runLoop error:", error); + await this.fail( + request, + error instanceof Error ? error.message : "Email agent failed" + ); + } + } + + /** + * Send the emails for a fan-out of `ask_interlocutor` calls and park the run. + * Asks that can't be resolved/sent settle immediately with an error so the + * barrier still completes; if none remain waiting, resume right away. + */ + private async parkOnAsks( + request: EmailAgentRunRequest, + asks: { id: string; name: string; arguments: Record }[] + ): Promise { + const mailbox = new CloudflareMailboxService(this.env); + const mailboxStub = this.mailboxStub(request.organizationId); + const emailId = (await this.storage.get("emailId")) ?? ""; + const threads = + (await this.storage.get>("threads")) ?? {}; + + const deadline = Date.now() + request.replyTimeoutMs; + const pending: PendingAsk[] = []; + + for (const ask of asks) { + const interlocutorId = String(ask.arguments.interlocutor ?? ""); + const message = String(ask.arguments.message ?? ""); + const who = resolveInterlocutor(request.interlocutors, interlocutorId); + + if (!who || !message) { + pending.push({ + toolCallId: ask.id, + interlocutorId, + threadId: "", + deadline, + status: "settled", + result: who + ? "(no message provided)" + : `(unknown interlocutor '${interlocutorId}')`, + }); + continue; + } + + const existing = threads[who.id]; + const subject = existing + ? `Re: ${existing.subject.replace(/^re:\s*/i, "")}` + : (ask.arguments.subject as string) || + request.subject || + `Regarding: ${request.objective.slice(0, 60)}`; + + try { + const sent = await mailbox.sendThreaded({ + organizationId: request.organizationId, + emailId, + to: who.email, + subject, + text: message, + ...(existing ? { threadId: existing.threadId } : {}), + }); + + threads[who.id] = { + threadId: sent.threadId, + subject: existing?.subject ?? subject, + }; + // Tag the mailbox thread so its replies route back to this run. + await mailboxStub.setThreadAgentRun(sent.threadId, request.runId); + pending.push({ + toolCallId: ask.id, + interlocutorId: who.id, + threadId: sent.threadId, + deadline, + status: "waiting", + }); + } catch (error) { + pending.push({ + toolCallId: ask.id, + interlocutorId: who.id, + threadId: "", + deadline, + status: "settled", + result: `(failed to send: ${ + error instanceof Error ? error.message : "unknown error" + })`, + }); + } + } + + await this.storage.put("threads", threads); + await this.storage.put("pending", pending); + await this.storage.put("status", "waiting" satisfies RunStatus); + + if (allSettled(pending)) { + // Nothing to wait for (all failed/invalid) — resume immediately. + this.ctx.waitUntil(this.resumeFromPending(pending)); + } else { + await this.storage.setAlarm(deadline); + } + } + + /** Build resume results from settled asks and continue the loop. */ + private async resumeFromPending(pending: PendingAsk[]): Promise { + const resume = toResumeResults(pending, ASK_TOOL, NO_REPLY_SENTINEL); + await this.storage.delete("pending"); + await this.storage.put("status", "running" satisfies RunStatus); + await this.runLoop(resume); + } + + // ── Completion / failure ───────────────────────────────────────────────── + + private async complete( + request: EmailAgentRunRequest, + completion: EmailCompletion + ): Promise { + await this.storage.put("completion", completion); + await this.storage.put("status", "completed" satisfies RunStatus); + await this.releaseThreads(request.organizationId); + await this.sendCompletionEvent(request, completion); + } + + private async fail( + request: EmailAgentRunRequest, + message: string + ): Promise { + await this.storage.put("status", "error" satisfies RunStatus); + try { + await this.releaseThreads(request.organizationId); + } catch { + // best-effort + } + try { + const instance = await this.env.EXECUTE.get(request.executionInstanceId); + await instance.sendEvent({ + type: `email-agent-complete-${request.nodeId}`, + payload: { outputs: {}, usage: 0, error: message }, + }); + } catch (error) { + console.error("EmailAgentRunner failed to send error event:", error); + } + } + + private async sendCompletionEvent( + request: EmailAgentRunRequest, + completion: EmailCompletion + ): Promise { + const instance = await this.env.EXECUTE.get(request.executionInstanceId); + await instance.sendEvent({ + type: `email-agent-complete-${request.nodeId}`, + payload: { + outputs: { + result: completion.result, + transcript: completion.transcript, + rounds: completion.rounds, + finish_reason: completion.finishReason, + usage_metadata: { + totalInputTokens: completion.totalInputTokens, + totalOutputTokens: completion.totalOutputTokens, + }, + }, + usage: completion.usage, + ...(completion.error ? { error: completion.error } : {}), + }, + }); + } + + // ── Helpers ──────────────────────────────────────────────────────────── + + private async buildTools( + request: EmailAgentRunRequest + ): Promise { + const askTool: ToolDefinition = { + name: ASK_TOOL, + description: + "Email a specific interlocutor and wait for their reply. Issue multiple " + + "calls in one turn to ask several interlocutors in parallel.", + parameters: { + type: "object", + properties: { + interlocutor: { + type: "string", + enum: request.interlocutors.map((i) => i.id), + description: "Id of the interlocutor to email", + }, + message: { + type: "string", + description: "The email body to send (plain text)", + }, + subject: { + type: "string", + description: "Optional subject for a new thread", + }, + }, + required: ["interlocutor", "message"], + }, + // Never invoked: suspending tools are intercepted by the loop. + function: async () => NO_REPLY_SENTINEL, + }; + + const userTools = request.tools?.length + ? applyCodeMode( + this.env, + await resolveTools( + request.tools, + await buildNodeToolProvider(this.env, request.organizationId) + ), + false + ) + : []; + + return [askTool, ...userTools]; + } + + private systemPrompt(request: EmailAgentRunRequest): string { + const roster = request.interlocutors + .map((i) => { + const bits = [i.role && `role=${i.role}`, i.name && `name=${i.name}`] + .filter(Boolean) + .join(", "); + return `- id=${i.id}${bits ? ` (${bits})` : ""}`; + }) + .join("\n"); + + const persona = request.instructions?.trim(); + return [ + persona, + "You coordinate with interlocutors over email to accomplish the objective. " + + `Use the ${ASK_TOOL} tool to email an interlocutor and wait for their reply. ` + + "You may contact several interlocutors in parallel by issuing multiple " + + `${ASK_TOOL} calls in a single turn. A reply of "${NO_REPLY_SENTINEL}" ` + + "means that interlocutor did not respond in time — decide how to proceed. " + + "When you have achieved the objective, stop calling tools and give your final result.", + `Interlocutors you may contact:\n${roster}`, + ] + .filter(Boolean) + .join("\n\n"); + } + + private userMessage(request: EmailAgentRunRequest): string { + return request.context + ? `Objective:\n${request.objective}\n\nContext:\n${request.context}` + : `Objective:\n${request.objective}`; + } + + private mailboxStub(organizationId: string) { + return this.env.MAILBOX.get( + this.env.MAILBOX.idFromName(`mailbox:${organizationId}`) + ); + } + + /** Record thread ids whose interlocutor never replied before the deadline. */ + private async markTimedOut(threadIds: string[]): Promise { + const timedOut = (await this.storage.get("timedOut")) ?? []; + timedOut.push(...threadIds); + await this.storage.put("timedOut", timedOut); + } + + /** Release mailbox-thread ownership so late replies fall through to triggers. */ + private async releaseThreads(organizationId: string): Promise { + const threads = + (await this.storage.get>("threads")) ?? {}; + const stub = this.mailboxStub(organizationId); + await Promise.all( + Object.values(threads).map((ref) => + stub.setThreadAgentRun(ref.threadId, null) + ) + ); + } + + /** + * Assemble the conversation transcript from the mailbox — the system of record + * for every sent message and reply — one entry per interlocutor thread. + */ + private async buildTranscript( + request: EmailAgentRunRequest + ): Promise { + const threads = + (await this.storage.get>("threads")) ?? {}; + const timedOut = new Set( + (await this.storage.get("timedOut")) ?? [] + ); + const stub = this.mailboxStub(request.organizationId); + + return Promise.all( + Object.entries(threads).map(async ([interlocutorId, ref]) => ({ + interlocutorId, + threadId: ref.threadId, + timedOut: timedOut.has(ref.threadId), + messages: await stub.listThreadMessages(ref.threadId), + })) + ); + } +} + +interface EmailCompletion { + result: string; + transcript: TranscriptThread[]; + rounds: number; + finishReason: "goal_reached" | "max_rounds" | "error"; + totalInputTokens: number; + totalOutputTokens: number; + usage: number; + error?: string; +} + +/** Maps the agent loop's finish reason onto the email completion's vocabulary. */ +const FINISH_REASON: Record = { + completed: "goal_reached", + max_steps_reached: "max_rounds", + error: "error", +}; + +function resolveInterlocutor( + interlocutors: EmailInterlocutor[], + ref: string +): EmailInterlocutor | undefined { + const needle = ref.trim().toLowerCase(); + return ( + interlocutors.find((i) => i.id.toLowerCase() === needle) ?? + interlocutors.find((i) => i.email.toLowerCase() === needle) ?? + interlocutors.find((i) => i.name?.toLowerCase() === needle) + ); +} diff --git a/apps/api/src/durable-objects/mailbox-do.ts b/apps/api/src/durable-objects/mailbox-do.ts index d358ebda..148d1589 100644 --- a/apps/api/src/durable-objects/mailbox-do.ts +++ b/apps/api/src/durable-objects/mailbox-do.ts @@ -113,9 +113,17 @@ export class MailboxDO extends DurableObject { archived_at INTEGER, last_message_at INTEGER NOT NULL, created_at INTEGER NOT NULL, - updated_at INTEGER NOT NULL + updated_at INTEGER NOT NULL, + agent_run_id TEXT ) `); + // Backfill the column on mailboxes created before email-agent support. + const threadCols = sql + .exec(`PRAGMA table_info(threads)`) + .toArray() as Record[]; + if (!threadCols.some((c) => c.name === "agent_run_id")) { + sql.exec(`ALTER TABLE threads ADD COLUMN agent_run_id TEXT`); + } sql.exec( `CREATE INDEX IF NOT EXISTS threads_email_id_idx ON threads(email_id)` ); @@ -178,9 +186,12 @@ export class MailboxDO extends DurableObject { * fromEmail within 30 days, else a new thread. A verified reply-token short- * circuits resolution. */ - async ingestInbound( - args: IngestInboundArgs - ): Promise<{ threadId: string; messageId: string }> { + async ingestInbound(args: IngestInboundArgs): Promise<{ + threadId: string; + messageId: string; + /** EmailAgentRunner that owns this thread, if it's an agent conversation. */ + agentRunId: string | null; + }> { this.ensureSchema(); const now = Date.now(); const threadId = this.resolveThread(args) ?? this.createThread(args, now); @@ -226,7 +237,26 @@ export class MailboxDO extends DurableObject { ); } - return { threadId, messageId: args.messageId }; + const agentRunId = this.threadAgentRunId(threadId); + return { threadId, messageId: args.messageId, agentRunId }; + } + + /** + * Tag (or clear) the EmailAgentRunner that owns a thread. An owned thread's + * inbound replies are routed to that run instead of triggering workflows; + * pass `null` to release ownership when the run finishes. + */ + async setThreadAgentRun( + threadId: string, + runId: string | null + ): Promise { + this.ensureSchema(); + this.ctx.storage.sql.exec( + `UPDATE threads SET agent_run_id = ?, updated_at = ? WHERE id = ?`, + runId, + Date.now(), + threadId + ); } /** Pre-insert an outbound message before the send is attempted. */ @@ -385,6 +415,13 @@ export class MailboxDO extends DurableObject { // ── Internal helpers ──────────────────────────────────────────────────── + private threadAgentRunId(threadId: string): string | null { + const rows = this.ctx.storage.sql + .exec(`SELECT agent_run_id FROM threads WHERE id = ? LIMIT 1`, threadId) + .toArray() as Record[]; + return (rows[0]?.agent_run_id as string | null) ?? null; + } + private resolveThread(args: IngestInboundArgs): string | undefined { const sql = this.ctx.storage.sql; diff --git a/apps/api/src/email.ts b/apps/api/src/email.ts index c1597e4f..55a70840 100644 --- a/apps/api/src/email.ts +++ b/apps/api/src/email.ts @@ -112,6 +112,21 @@ export async function handleIncomingEmail( subaddress, }); + // If the mailbox flagged this thread as owned by an email-agent run, hand the + // reply to that runner instead of triggering workflows afresh. + if (mailbox?.agentRunId) { + await deliverReplyToEmailAgent( + env, + mailbox.agentRunId, + mailbox.threadId, + mailbox.text ?? "" + ); + console.log( + `[email] reply on thread ${mailbox.threadId} routed to email agent run ${mailbox.agentRunId}` + ); + return; + } + // Get all workflows triggered by this email const emailTriggersWithWorkflows = await getEmailTriggersByEmail( db, @@ -177,7 +192,15 @@ async function persistInboundEmail({ from: string; to: string; subaddress: string | null; -}): Promise<{ threadId: string; messageId: string } | undefined> { +}): Promise< + | { + threadId: string; + messageId: string; + agentRunId: string | null; + text?: string; + } + | undefined +> { try { const messageId = uuidv7(); const staged = await parseAndStageEmail( @@ -205,7 +228,7 @@ async function persistInboundEmail({ ...staged, verifiedThreadId, }); - return result; + return { ...result, text: staged.text }; } catch (error) { console.error( `[email] failed to persist inbound message for ${email.handle}:`, @@ -215,6 +238,32 @@ async function persistInboundEmail({ } } +/** + * Deliver a reply to the EmailAgentRunner that owns its thread (best effort). + * The caller has already decided the thread is agent-owned, so the message is + * treated as handled regardless of the outcome — re-triggering workflows for an + * agent-owned thread would be wrong even if the specific ask already settled. + */ +async function deliverReplyToEmailAgent( + env: Bindings, + runId: string, + threadId: string, + text: string +): Promise { + if (!env.EMAIL_AGENT_RUNNER) return; + try { + const stub = env.EMAIL_AGENT_RUNNER.get( + env.EMAIL_AGENT_RUNNER.idFromName(runId) + ); + await stub.deliverReply({ threadId, text }); + } catch (error) { + console.error( + `[email] failed to deliver reply to email agent run ${runId}:`, + error instanceof Error ? error.message : String(error) + ); + } +} + async function triggerWorkflowForEmail({ workflow, email, diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index d36b1152..06fc437a 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -146,6 +146,7 @@ export { Sandbox } from "@cloudflare/sandbox"; export { FFmpegContainer } from "./containers/ffmpeg-container"; export { LanguageSandbox } from "./containers/language-sandbox"; export { AgentRunner } from "./durable-objects/agent-runner"; +export { EmailAgentRunner } from "./durable-objects/email-agent-runner"; export { MailboxDO } from "./durable-objects/mailbox-do"; export default { diff --git a/apps/api/src/mailbox-staging.ts b/apps/api/src/mailbox-staging.ts index 66c2fad6..fc52a286 100644 --- a/apps/api/src/mailbox-staging.ts +++ b/apps/api/src/mailbox-staging.ts @@ -27,6 +27,8 @@ export interface StagedEmail { references: string[]; referencesChain: string | null; snippet: string; + /** Full parsed plain-text body (falls back to stripped HTML), if any. */ + text?: string; hasHtml: boolean; hasText: boolean; rawR2Key: string; @@ -134,6 +136,7 @@ export async function parseAndStageEmail( references, referencesChain, snippet, + text: textBody ?? (htmlBody ? stripHtml(htmlBody) : undefined), hasHtml: Boolean(htmlBody), hasText: Boolean(textBody), rawR2Key: keys.raw, diff --git a/apps/api/src/runtime/cloudflare-mailbox-service.ts b/apps/api/src/runtime/cloudflare-mailbox-service.ts index 8c546ca8..fd86aeca 100644 --- a/apps/api/src/runtime/cloudflare-mailbox-service.ts +++ b/apps/api/src/runtime/cloudflare-mailbox-service.ts @@ -159,7 +159,7 @@ export class CloudflareMailboxService implements MailboxService { console.error("[mailbox send] touchThread failed", error); }); - return { messageId }; + return { messageId, threadId }; } async getThread( diff --git a/apps/api/src/runtime/cloudflare-node-registry.ts b/apps/api/src/runtime/cloudflare-node-registry.ts index 8951c596..d85653e6 100644 --- a/apps/api/src/runtime/cloudflare-node-registry.ts +++ b/apps/api/src/runtime/cloudflare-node-registry.ts @@ -98,6 +98,7 @@ import { ListUserGuildsDiscordNode } from "@dafthunk/runtime/nodes/discord/list- import { SendDMDiscordNode } from "@dafthunk/runtime/nodes/discord/send-dm-discord-node"; import { SendMessageDiscordNode } from "@dafthunk/runtime/nodes/discord/send-message-discord-node"; import { ToMarkdownNode } from "@dafthunk/runtime/nodes/document/to-markdown-node"; +import { EmailAgentClaudeSonnet4Node } from "@dafthunk/runtime/nodes/email/email-agent-claude-sonnet-4-node"; import { ExtractEmailAttachmentsNode } from "@dafthunk/runtime/nodes/email/extract-email-attachments-node"; import { GetEmailThreadNode } from "@dafthunk/runtime/nodes/email/get-email-thread-node"; import { ParseEmailNode } from "@dafthunk/runtime/nodes/email/parse-email-node"; @@ -780,6 +781,8 @@ export class CloudflareNodeRegistry extends BaseNodeRegistry { if (hasSendEmail) { this.registerImplementation(SendEmailNode); + // Email coordination agent — sends + waits for replies via Durable Object + this.registerImplementation(EmailAgentClaudeSonnet4Node); } if (hasGoogleMail) { diff --git a/apps/api/src/test-entry.ts b/apps/api/src/test-entry.ts index 76eacac8..464ae34e 100644 --- a/apps/api/src/test-entry.ts +++ b/apps/api/src/test-entry.ts @@ -24,6 +24,8 @@ * - Avoids importing CloudflareNodeRegistry and heavy dependencies */ +// Export EmailAgentRunner so the email-agent flow can be exercised in tests. +export { EmailAgentRunner } from "./durable-objects/email-agent-runner"; // Export MailboxDO so the per-org mailbox can be exercised in tests. export { MailboxDO } from "./durable-objects/mailbox-do"; // Export WorkflowAgent for Durable Object testing (if needed in future) diff --git a/apps/api/src/utils/encryption.test.ts b/apps/api/src/utils/encryption.test.ts index 1ae46612..ad015cf2 100644 --- a/apps/api/src/utils/encryption.test.ts +++ b/apps/api/src/utils/encryption.test.ts @@ -43,6 +43,7 @@ const createMockEnv = (masterKey?: string): Bindings => ({ CLOUDFLARE_AI_GATEWAY_ID: "", AI_OPTIONS: {}, AGENT_RUNNER: {} as DurableObjectNamespace, + EMAIL_AGENT_RUNNER: {} as DurableObjectNamespace, MAILBOX: {} as DurableObjectNamespace, }); diff --git a/apps/api/wrangler.jsonc b/apps/api/wrangler.jsonc index 831bfb92..8e019633 100644 --- a/apps/api/wrangler.jsonc +++ b/apps/api/wrangler.jsonc @@ -163,6 +163,11 @@ "name": "MAILBOX", "class_name": "MailboxDO", "script_name": "dafthunk-api" + }, + { + "name": "EMAIL_AGENT_RUNNER", + "class_name": "EmailAgentRunner", + "script_name": "dafthunk-api" } ] }, @@ -213,6 +218,10 @@ { "tag": "v12", "new_sqlite_classes": ["MailboxDO"] + }, + { + "tag": "v13", + "new_sqlite_classes": ["EmailAgentRunner"] } ], "unsafe": { @@ -384,6 +393,11 @@ "name": "MAILBOX", "class_name": "MailboxDO", "script_name": "dafthunk-api" + }, + { + "name": "EMAIL_AGENT_RUNNER", + "class_name": "EmailAgentRunner", + "script_name": "dafthunk-api" } ] }, @@ -434,6 +448,10 @@ { "tag": "v12", "new_sqlite_classes": ["MailboxDO"] + }, + { + "tag": "v13", + "new_sqlite_classes": ["EmailAgentRunner"] } ], "unsafe": { diff --git a/apps/api/wrangler.test.jsonc b/apps/api/wrangler.test.jsonc index d4c9bd9b..5415dba4 100644 --- a/apps/api/wrangler.test.jsonc +++ b/apps/api/wrangler.test.jsonc @@ -47,6 +47,10 @@ { "name": "MAILBOX", "class_name": "MailboxDO" + }, + { + "name": "EMAIL_AGENT_RUNNER", + "class_name": "EmailAgentRunner" } ] }, @@ -54,6 +58,10 @@ { "tag": "v1", "new_sqlite_classes": ["MailboxDO"] + }, + { + "tag": "v2", + "new_sqlite_classes": ["EmailAgentRunner"] } ], "analytics_engine_datasets": [ diff --git a/apps/app/package.json b/apps/app/package.json index 7cdcb5ff..c3223ad2 100644 --- a/apps/app/package.json +++ b/apps/app/package.json @@ -55,7 +55,7 @@ "react": "^19.2.6", "react-day-picker": "^9.14.0", "react-dom": "^19.2.6", - "react-router": "^7.15.0", + "react-router": "^7.18.0", "recharts": "^3.8.1", "sonner": "^2.0.7", "swr": "^2.4.1" @@ -78,8 +78,8 @@ "tsx": "^4.21.0", "tw-animate-css": "^1.4.0", "typescript": "^5.9.3", - "vite": "^7.3.3", - "vitest": "^4.1.6", + "vite": "^7.3.5", + "vitest": "^4.1.9", "wrangler": "^4.90.1" } } diff --git a/apps/www/package.json b/apps/www/package.json index c35a421e..f06b5a33 100644 --- a/apps/www/package.json +++ b/apps/www/package.json @@ -23,14 +23,14 @@ "lucide-react": "^0.562.0", "react": "^19.2.6", "react-dom": "^19.2.6", - "react-router": "^7.15.0", + "react-router": "^7.18.0", "shiki": "^3.23.0", "tailwind-merge": "^3.6.0" }, "devDependencies": { "@cloudflare/vite-plugin": "^1.36.4", "@cloudflare/workers-types": "^4.20260511.1", - "@react-router/dev": "^7.15.0", + "@react-router/dev": "^7.18.0", "@tailwindcss/typography": "^0.5.19", "@tailwindcss/vite": "^4.3.0", "@types/react": "^19.2.14", @@ -38,8 +38,8 @@ "tailwindcss": "^4.3.0", "tsx": "^4.21.0", "typescript": "^5.9.3", - "vite": "^7.3.3", - "vitest": "^4.1.6", + "vite": "^7.3.5", + "vitest": "^4.1.9", "wrangler": "^4.90.1" } } diff --git a/packages/geo/package.json b/packages/geo/package.json index 7cdf3d0e..7426ded5 100644 --- a/packages/geo/package.json +++ b/packages/geo/package.json @@ -22,6 +22,6 @@ "@turf/turf": "^7.3.5", "@types/d3-delaunay": "^6.0.4", "typescript": "^5.9.3", - "vitest": "^4.1.6" + "vitest": "^4.1.9" } } diff --git a/packages/runtime/package.json b/packages/runtime/package.json index 67de6e09..0bff3067 100644 --- a/packages/runtime/package.json +++ b/packages/runtime/package.json @@ -28,7 +28,7 @@ "@gltf-transform/core": "^4.3.0", "@google/genai": "^1.52.0", "@mapbox/martini": "^0.2.0", - "exifreader": "^4.38.1", + "exifreader": "^4.41.0", "geotiff": "^2.1.3", "jsonpath-plus": "^10.4.0", "openai": "^6.37.0", @@ -45,6 +45,6 @@ "@types/three": "^0.181.0", "@types/wellknown": "^0.5.8", "typescript": "^5.9.3", - "vitest": "^4.1.6" + "vitest": "^4.1.9" } } diff --git a/packages/runtime/src/mailbox-service.ts b/packages/runtime/src/mailbox-service.ts index 14dff9ed..97691e5a 100644 --- a/packages/runtime/src/mailbox-service.ts +++ b/packages/runtime/src/mailbox-service.ts @@ -55,6 +55,8 @@ export interface SendThreadedArgs { export interface SendThreadedResult { messageId: string; + /** Thread the message was sent on — newly opened when no `threadId` was given. */ + threadId: string; } export interface MailboxService { diff --git a/packages/runtime/src/node-types.ts b/packages/runtime/src/node-types.ts index e8c1ff10..113e35e4 100644 --- a/packages/runtime/src/node-types.ts +++ b/packages/runtime/src/node-types.ts @@ -221,6 +221,8 @@ export interface NodeEnv { // eslint-disable-next-line @typescript-eslint/no-explicit-any AGENT_RUNNER: DurableObjectNamespace; // eslint-disable-next-line @typescript-eslint/no-explicit-any + EMAIL_AGENT_RUNNER?: DurableObjectNamespace; + // eslint-disable-next-line @typescript-eslint/no-explicit-any FFMPEG_CONTAINER?: DurableObjectNamespace; // eslint-disable-next-line @typescript-eslint/no-explicit-any DUCKDB_SANDBOX?: DurableObjectNamespace; diff --git a/packages/runtime/src/nodes/email/base-email-agent-node.ts b/packages/runtime/src/nodes/email/base-email-agent-node.ts new file mode 100644 index 00000000..550369cf --- /dev/null +++ b/packages/runtime/src/nodes/email/base-email-agent-node.ts @@ -0,0 +1,307 @@ +import type { NodeExecution, NodeType } from "@dafthunk/types"; + +import type { NodeContext } from "../../node-types"; +import { ExecutableNode } from "../../node-types"; +import type { TokenPricing } from "../../utils/usage"; +import type { AgentProvider } from "../agent/base-agent-node"; + +export interface EmailAgentNodeConfig { + provider: AgentProvider; + model: string; + pricing: TokenPricing; +} + +const DEFAULT_REPLY_TIMEOUT = "3 days"; +const DEFAULT_MAX_ROUNDS = 10; +/** Upper bound for the workflow's backstop wait (the DO finishes well before). */ +const MAX_BACKSTOP_MS = 30 * 24 * 60 * 60 * 1000; + +const EMAIL_AGENT_INPUTS: NodeType["inputs"] = [ + { + name: "from", + description: + "Handle of the organization email address to send from (the part before @).", + type: "string", + required: true, + }, + { + name: "interlocutors", + description: + "People the agent may email: an array of { id, email, name?, role? }. The agent addresses them by id.", + type: "json", + required: true, + }, + { + name: "objective", + description: "The goal the agent should pursue through the conversation.", + type: "string", + required: true, + }, + { + name: "instructions", + description: "Optional persona / behavioural system prompt.", + type: "string", + required: false, + }, + { + name: "context", + description: "Optional background material for the agent.", + type: "string", + required: false, + }, + { + name: "subject", + description: "Default subject for newly opened email threads.", + type: "string", + required: false, + }, + { + name: "max_rounds", + description: `Maximum conversation rounds before wrapping up (default ${DEFAULT_MAX_ROUNDS}).`, + type: "number", + required: false, + value: DEFAULT_MAX_ROUNDS, + }, + { + name: "reply_timeout", + description: `How long to wait for each reply, e.g. "3 days", "24 hours" (default ${DEFAULT_REPLY_TIMEOUT}).`, + type: "string", + required: false, + value: DEFAULT_REPLY_TIMEOUT, + }, + { + name: "tools", + description: "Tool references the agent may call between emails.", + type: "json", + required: false, + }, + { + name: "schema", + description: "Optional schema constraining the final result.", + type: "schema", + required: false, + hidden: true, + }, +]; + +const EMAIL_AGENT_OUTPUTS: NodeType["outputs"] = [ + { + name: "result", + description: "The agent's final result once the objective is reached.", + type: "any", + }, + { + name: "transcript", + description: + "Per-interlocutor record of the messages sent and replies received.", + type: "json", + }, + { + name: "rounds", + description: "Number of conversation rounds taken.", + type: "number", + }, + { + name: "finish_reason", + description: "Why the agent stopped: goal_reached, max_rounds, or error.", + type: "string", + }, + { + name: "usage_metadata", + description: "Token usage for the run.", + type: "json", + hidden: true, + }, +]; + +export function buildEmailAgentNodeType(meta: { + id: string; + name: string; + description: string; + tags: string[]; + documentation: string; + subscription?: boolean; +}): NodeType { + return { + id: meta.id, + name: meta.name, + type: meta.id, + description: meta.description, + tags: meta.tags, + icon: "mail", + documentation: meta.documentation, + usage: 1, + subscription: meta.subscription, + functionCalling: true, + inputs: EMAIL_AGENT_INPUTS, + outputs: EMAIL_AGENT_OUTPUTS, + }; +} + +interface NormalizedInterlocutor { + id: string; + email: string; + name?: string; + role?: string; +} + +/** + * Base class for email-coordination agents. The agent emails one or more + * interlocutors and waits — potentially for days — for their replies, looping + * until the objective is met. Execution is delegated to the EmailAgentRunner + * Durable Object; this node fires it and parks on a completion event. + * + * Requires durable workflow execution (not available in worker mode). + */ +export abstract class BaseEmailAgentNode extends ExecutableNode { + protected static readonly agentConfig: EmailAgentNodeConfig; + + async execute(context: NodeContext): Promise { + const config = (this.constructor as typeof BaseEmailAgentNode).agentConfig; + + if (!context.asyncSupported || !context.executionId) { + return this.createErrorResult( + "Email Agent requires durable workflow execution (not available in worker mode)" + ); + } + + // Cast to the non-generic namespace to avoid deep `` stub typing. + const runner = context.env.EMAIL_AGENT_RUNNER as + | DurableObjectNamespace + | undefined; + if (!runner) { + return this.createErrorResult("Email agent runner is not available"); + } + + const from = (context.inputs.from as string | undefined)?.trim(); + const objective = (context.inputs.objective as string | undefined)?.trim(); + if (!from) return this.createErrorResult("'from' handle is required"); + if (!objective) return this.createErrorResult("'objective' is required"); + + const interlocutors = normalizeInterlocutors(context.inputs.interlocutors); + if (interlocutors.length === 0) { + return this.createErrorResult( + "At least one interlocutor with an email address is required" + ); + } + + const maxRounds = clampInt( + context.inputs.max_rounds as number | undefined, + DEFAULT_MAX_ROUNDS + ); + const replyTimeoutMs = parseDuration( + context.inputs.reply_timeout as string | undefined, + DEFAULT_REPLY_TIMEOUT + ); + + const runId = `${context.executionId}:${context.nodeId}`; + const request = { + runId, + executionInstanceId: context.executionId, + nodeId: context.nodeId, + provider: config.provider, + model: config.model, + pricing: config.pricing, + organizationId: context.organizationId, + fromHandle: from, + interlocutors, + objective, + instructions: (context.inputs.instructions as string) || "", + context: (context.inputs.context as string) || undefined, + subject: (context.inputs.subject as string) || undefined, + maxRounds, + replyTimeoutMs, + tools: context.inputs.tools ?? [], + ...(context.inputs.schema ? { schema: context.inputs.schema } : {}), + }; + + try { + const stub = runner.get(runner.idFromName(runId)); + const response = await stub.fetch("https://email-agent/start", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(request), + }); + if (!response.ok) { + const error = (await response.json().catch(() => ({}))) as { + error?: string; + }; + return this.createErrorResult( + error.error ?? "Failed to start email agent" + ); + } + } catch (error) { + return this.createErrorResult( + error instanceof Error ? error.message : "Failed to start email agent" + ); + } + + // Park until the runner reports completion. The backstop must outlast the + // worst-case wait; the runner's per-reply alarm settles the run far sooner. + const backstopMs = Math.min( + maxRounds * replyTimeoutMs + 60 * 60 * 1000, + MAX_BACKSTOP_MS + ); + + return { + nodeId: this.node.id, + status: "pending", + usage: 0, + pendingEvent: { + type: `email-agent-complete-${context.nodeId}`, + timeout: formatDuration(backstopMs), + }, + }; + } +} + +function normalizeInterlocutors(value: unknown): NormalizedInterlocutor[] { + if (!Array.isArray(value)) return []; + const out: NormalizedInterlocutor[] = []; + for (const raw of value) { + if (!raw || typeof raw !== "object") continue; + const r = raw as Record; + const email = typeof r.email === "string" ? r.email.trim() : ""; + if (!email) continue; + const id = typeof r.id === "string" && r.id.trim() ? r.id.trim() : email; + out.push({ + id, + email, + ...(typeof r.name === "string" ? { name: r.name } : {}), + ...(typeof r.role === "string" ? { role: r.role } : {}), + }); + } + return out; +} + +function clampInt(value: number | undefined, fallback: number): number { + if (typeof value !== "number" || !Number.isFinite(value) || value < 1) { + return fallback; + } + return Math.floor(value); +} + +const UNIT_MS: Record = { + minute: 60 * 1000, + hour: 60 * 60 * 1000, + day: 24 * 60 * 60 * 1000, +}; + +/** Parse a " " duration (minutes/hours/days) into milliseconds. */ +function parseDuration(value: string | undefined, fallback: string): number { + const match = (value ?? fallback) + .trim() + .toLowerCase() + .match(/^(\d+(?:\.\d+)?)\s*(minute|hour|day)s?$/); + if (!match) return parseDuration(fallback, "3 days"); + return Math.round(parseFloat(match[1]) * UNIT_MS[match[2]]); +} + +/** Format milliseconds as a Cloudflare Workflows timeout string. */ +function formatDuration(ms: number): string { + const day = UNIT_MS.day; + const hour = UNIT_MS.hour; + if (ms >= day) return `${Math.ceil(ms / day)} days`; + if (ms >= hour) return `${Math.ceil(ms / hour)} hours`; + return `${Math.max(1, Math.ceil(ms / UNIT_MS.minute))} minutes`; +} diff --git a/packages/runtime/src/nodes/email/email-agent-claude-sonnet-4-node.ts b/packages/runtime/src/nodes/email/email-agent-claude-sonnet-4-node.ts new file mode 100644 index 00000000..2df70ecb --- /dev/null +++ b/packages/runtime/src/nodes/email/email-agent-claude-sonnet-4-node.ts @@ -0,0 +1,26 @@ +import type { NodeType } from "@dafthunk/types"; + +import { + BaseEmailAgentNode, + buildEmailAgentNodeType, +} from "./base-email-agent-node"; + +export class EmailAgentClaudeSonnet4Node extends BaseEmailAgentNode { + // https://www.anthropic.com/pricing + protected static readonly agentConfig = { + provider: "anthropic" as const, + model: "claude-sonnet-4-0", + pricing: { inputCostPerMillion: 3.0, outputCostPerMillion: 15.0 }, + }; + + public static readonly nodeType: NodeType = buildEmailAgentNodeType({ + id: "email-agent-claude-sonnet-4", + name: "Email Agent Claude Sonnet 4", + description: + "AI agent that emails one or more interlocutors and waits for their replies, looping until a goal is reached", + tags: ["AI", "Agent", "Email", "Anthropic", "Claude"], + documentation: + "This node runs an agent that coordinates with people over email to accomplish an objective. It can question several interlocutors in parallel and pauses the workflow — for days if needed — until replies arrive, resuming automatically. When the objective is met it returns the result and a transcript of the conversation. Requires durable workflow execution.", + subscription: true, + }); +} diff --git a/packages/runtime/src/nodes/email/send-email-node.test.ts b/packages/runtime/src/nodes/email/send-email-node.test.ts index 0ed57b1b..f8486ffb 100644 --- a/packages/runtime/src/nodes/email/send-email-node.test.ts +++ b/packages/runtime/src/nodes/email/send-email-node.test.ts @@ -12,7 +12,7 @@ describe("SendEmailNode", () => { it("threads + persists via the mailbox service when in mailbox context", async () => { const sendThreaded = vi .fn() - .mockResolvedValue({ messageId: "msg-123" }); + .mockResolvedValue({ messageId: "msg-123", threadId: "thread-1" }); const mailboxService = { sendThreaded, getThread: vi.fn(), diff --git a/packages/runtime/src/utils/agent-loop.test.ts b/packages/runtime/src/utils/agent-loop.test.ts new file mode 100644 index 00000000..c747af84 --- /dev/null +++ b/packages/runtime/src/utils/agent-loop.test.ts @@ -0,0 +1,353 @@ +import { describe, expect, it } from "vitest"; +import type { ToolDefinition } from "../tool-types"; +import { + type AgentLoopConfig, + type AgentMessage, + type AgentToolCall, + type LLMResponse, + runAgentLoop, +} from "./agent-loop"; + +// ── Test helpers ─────────────────────────────────────────────────────────── + +/** A scripted LLM: returns queued responses in order, recording each call. */ +function scriptedLLM(responses: LLMResponse[]) { + let index = 0; + const calls: { messages: AgentMessage[]; tools: ToolDefinition[] }[] = []; + const fn = async ( + messages: AgentMessage[], + tools: ToolDefinition[] + ): Promise => { + calls.push({ messages: structuredClone(messages), tools }); + const response = responses[index]; + if (!response) { + throw new Error(`scriptedLLM ran out of responses at call ${index}`); + } + index += 1; + return response; + }; + return { fn, calls, callCount: () => index }; +} + +function llmText(content: string, tokens = 0): LLMResponse { + return { content, toolCalls: [], inputTokens: tokens, outputTokens: tokens }; +} + +function llmCalls(toolCalls: AgentToolCall[], content = ""): LLMResponse { + return { content, toolCalls, inputTokens: 1, outputTokens: 1 }; +} + +function call( + id: string, + name: string, + args: Record = {} +): AgentToolCall { + return { id, name, arguments: args }; +} + +function syncTool( + name: string, + fn: (args: Record) => string = () => "ok" +): ToolDefinition { + return { + name, + description: name, + parameters: { type: "object", properties: {} }, + function: async (args) => fn(args as Record), + }; +} + +const isAsk = (name: string) => name === "ask"; + +// ── Backward compatibility ───────────────────────────────────────────────── + +describe("runAgentLoop — completion (no suspending tools)", () => { + it("runs a normal tool turn then completes", async () => { + const llm = scriptedLLM([ + llmCalls([call("c1", "lookup")]), + llmText("final answer"), + ]); + const config: AgentLoopConfig = { + userMessage: "hi", + tools: [syncTool("lookup", () => "looked up")], + maxSteps: 5, + callLLM: llm.fn, + }; + + const outcome = await runAgentLoop(config); + + expect(outcome.status).toBe("completed"); + if (outcome.status !== "completed") return; + expect(outcome.text).toBe("final answer"); + expect(outcome.finishReason).toBe("completed"); + expect(outcome.totalSteps).toBe(1); + expect(outcome.steps[0].toolResults[0].content).toBe("looked up"); + }); + + it("completes immediately when the model calls no tools", async () => { + const llm = scriptedLLM([llmText("done")]); + const outcome = await runAgentLoop({ + userMessage: "hi", + tools: [], + maxSteps: 5, + callLLM: llm.fn, + }); + expect(outcome.status).toBe("completed"); + if (outcome.status !== "completed") return; + expect(outcome.text).toBe("done"); + expect(outcome.totalSteps).toBe(0); + }); +}); + +// ── Suspend ───────────────────────────────────────────────────────────────── + +describe("runAgentLoop — suspend", () => { + it("suspends when a suspending tool is called and records no step yet", async () => { + const llm = scriptedLLM([llmCalls([call("a1", "ask", { to: "alice" })])]); + const outcome = await runAgentLoop({ + userMessage: "hi", + tools: [], + maxSteps: 5, + callLLM: llm.fn, + isSuspendingTool: isAsk, + }); + + expect(outcome.status).toBe("suspended"); + if (outcome.status !== "suspended") return; + expect(outcome.pendingToolCalls).toHaveLength(1); + expect(outcome.pendingToolCalls[0].id).toBe("a1"); + expect(outcome.pendingToolCalls[0].arguments).toEqual({ to: "alice" }); + // The suspending tool's function is never invoked. + expect(llm.callCount()).toBe(1); + // Step is deferred to resume; assistant message is in state. + expect(outcome.state.steps).toHaveLength(0); + const last = outcome.state.messages.at(-1); + expect(last?.role).toBe("assistant"); + expect(last?.toolCalls?.[0].id).toBe("a1"); + }); + + it("runs synchronous tools in a mixed turn but parks on the suspending one", async () => { + let lookupRan = false; + const llm = scriptedLLM([ + llmCalls([call("s1", "lookup"), call("a1", "ask", { to: "bob" })]), + ]); + const outcome = await runAgentLoop({ + userMessage: "hi", + tools: [ + syncTool("lookup", () => { + lookupRan = true; + return "sync-result"; + }), + ], + maxSteps: 5, + callLLM: llm.fn, + isSuspendingTool: isAsk, + }); + + expect(outcome.status).toBe("suspended"); + if (outcome.status !== "suspended") return; + expect(lookupRan).toBe(true); + expect(outcome.pendingToolCalls.map((c) => c.id)).toEqual(["a1"]); + // The sync tool result is already in the conversation, ready for resume. + const toolMsgs = outcome.state.messages.filter((m) => m.role === "tool"); + expect(toolMsgs).toHaveLength(1); + expect(toolMsgs[0].content).toBe("sync-result"); + expect(toolMsgs[0].toolCallId).toBe("s1"); + }); + + it("collects a parallel fan-out of suspending calls", async () => { + const llm = scriptedLLM([ + llmCalls([ + call("a1", "ask", { to: "alice" }), + call("a2", "ask", { to: "bob" }), + call("a3", "ask", { to: "carol" }), + ]), + ]); + const outcome = await runAgentLoop({ + userMessage: "hi", + tools: [], + maxSteps: 5, + callLLM: llm.fn, + isSuspendingTool: isAsk, + }); + + expect(outcome.status).toBe("suspended"); + if (outcome.status !== "suspended") return; + expect(outcome.pendingToolCalls.map((c) => c.id)).toEqual([ + "a1", + "a2", + "a3", + ]); + }); +}); + +// ── Resume ─────────────────────────────────────────────────────────────────── + +describe("runAgentLoop — resume", () => { + it("injects results, records the suspended step, and runs to completion", async () => { + const llm = scriptedLLM([ + llmCalls([call("a1", "ask", { to: "alice" })]), + llmText("all done"), + ]); + const shared = { + maxSteps: 5, + tools: [], + callLLM: llm.fn, + isSuspendingTool: isAsk, + }; + + const suspended = await runAgentLoop({ userMessage: "hi", ...shared }); + expect(suspended.status).toBe("suspended"); + if (suspended.status !== "suspended") return; + + const resumed = await runAgentLoop({ + userMessage: "hi", + ...shared, + resumeState: suspended.state, + resumeToolResults: [ + { toolCallId: "a1", toolName: "ask", content: "alice says yes" }, + ], + }); + + expect(resumed.status).toBe("completed"); + if (resumed.status !== "completed") return; + expect(resumed.text).toBe("all done"); + expect(resumed.totalSteps).toBe(1); + // The recorded step carries the resolved async result. + expect(resumed.steps[0].toolResults).toHaveLength(1); + expect(resumed.steps[0].toolResults[0].content).toBe("alice says yes"); + expect(resumed.steps[0].toolResults[0].toolCallId).toBe("a1"); + // The follow-up LLM call saw the injected tool result. + const lastCall = llm.calls.at(-1); + expect(lastCall?.messages.at(-1)?.content).toBe("alice says yes"); + }); + + it("records sync + async results together for a mixed suspended turn", async () => { + const llm = scriptedLLM([ + llmCalls([call("s1", "lookup"), call("a1", "ask")]), + llmText("done"), + ]); + const shared = { + maxSteps: 5, + tools: [syncTool("lookup", () => "sync")], + callLLM: llm.fn, + isSuspendingTool: isAsk, + }; + + const suspended = await runAgentLoop({ userMessage: "hi", ...shared }); + if (suspended.status !== "suspended") throw new Error("expected suspend"); + + const resumed = await runAgentLoop({ + userMessage: "hi", + ...shared, + resumeState: suspended.state, + resumeToolResults: [ + { toolCallId: "a1", toolName: "ask", content: "async" }, + ], + }); + + if (resumed.status !== "completed") throw new Error("expected completion"); + expect(resumed.steps[0].toolResults.map((r) => r.content)).toEqual([ + "sync", + "async", + ]); + }); + + it("resumes a parallel fan-out with one reply and one timeout sentinel", async () => { + const llm = scriptedLLM([ + llmCalls([ + call("a1", "ask", { to: "alice" }), + call("a2", "ask", { to: "bob" }), + ]), + llmText("summary"), + ]); + const shared = { + maxSteps: 5, + tools: [], + callLLM: llm.fn, + isSuspendingTool: isAsk, + }; + + const suspended = await runAgentLoop({ userMessage: "hi", ...shared }); + if (suspended.status !== "suspended") throw new Error("expected suspend"); + + const resumed = await runAgentLoop({ + userMessage: "hi", + ...shared, + resumeState: suspended.state, + resumeToolResults: [ + { toolCallId: "a1", toolName: "ask", content: "alice replied" }, + { toolCallId: "a2", toolName: "ask", content: "(no reply received)" }, + ], + }); + + if (resumed.status !== "completed") throw new Error("expected completion"); + expect(resumed.text).toBe("summary"); + expect(resumed.steps[0].toolResults.map((r) => r.content)).toEqual([ + "alice replied", + "(no reply received)", + ]); + }); + + it("supports multiple suspend/resume cycles", async () => { + const llm = scriptedLLM([ + llmCalls([call("a1", "ask")]), // turn 1 -> suspend + llmCalls([call("a2", "ask")]), // turn 2 -> suspend again + llmText("finished"), // turn 3 -> complete + ]); + const shared = { + maxSteps: 10, + tools: [], + callLLM: llm.fn, + isSuspendingTool: isAsk, + }; + + const s1 = await runAgentLoop({ userMessage: "hi", ...shared }); + if (s1.status !== "suspended") throw new Error("expected suspend 1"); + + const s2 = await runAgentLoop({ + userMessage: "hi", + ...shared, + resumeState: s1.state, + resumeToolResults: [{ toolCallId: "a1", toolName: "ask", content: "r1" }], + }); + if (s2.status !== "suspended") throw new Error("expected suspend 2"); + + const done = await runAgentLoop({ + userMessage: "hi", + ...shared, + resumeState: s2.state, + resumeToolResults: [{ toolCallId: "a2", toolName: "ask", content: "r2" }], + }); + + if (done.status !== "completed") throw new Error("expected completion"); + expect(done.text).toBe("finished"); + expect(done.totalSteps).toBe(2); + expect(done.steps.map((s) => s.toolResults[0].content)).toEqual([ + "r1", + "r2", + ]); + }); + + it("throws when resuming without a suspended assistant turn", async () => { + const llm = scriptedLLM([llmText("unused")]); + await expect( + runAgentLoop({ + userMessage: "hi", + tools: [], + maxSteps: 5, + callLLM: llm.fn, + isSuspendingTool: isAsk, + resumeState: { + messages: [{ role: "user", content: "hi" }], + steps: [], + totalInputTokens: 0, + totalOutputTokens: 0, + }, + resumeToolResults: [ + { toolCallId: "x", toolName: "ask", content: "orphan" }, + ], + }) + ).rejects.toThrow(/no suspended assistant turn/); + }); +}); diff --git a/packages/runtime/src/utils/agent-loop.ts b/packages/runtime/src/utils/agent-loop.ts index 767d9736..1b66783e 100644 --- a/packages/runtime/src/utils/agent-loop.ts +++ b/packages/runtime/src/utils/agent-loop.ts @@ -76,6 +76,38 @@ export interface AgentLoopConfig { /** If provided, resume from a previous state instead of starting fresh */ resumeState?: AgentLoopState; + + /** + * Predicate marking a tool as *suspending*: its result is not available + * synchronously. When the model calls one or more suspending tools in a turn, + * the loop executes any non-suspending tool calls in that same turn, then + * stops and returns an {@link AgentLoopSuspension} instead of an + * {@link AgentLoopResult}. The caller performs the long-running work (e.g. + * emailing an interlocutor and parking until a reply arrives) and later + * resumes by calling the loop again with `resumeState` plus + * `resumeToolResults` for the suspended calls. + * + * Suspending and non-suspending tool calls can be mixed in a single turn; the + * non-suspending ones run immediately so their results are ready on resume. + */ + isSuspendingTool?: (toolName: string) => boolean; + + /** + * Results for the tool calls that suspended the loop, supplied when resuming. + * Must be provided together with a `resumeState` returned from a prior + * suspension. Each entry resolves one of the suspension's `pendingToolCalls`. + */ + resumeToolResults?: ResolvedToolResult[]; +} + +/** Resolved result for a previously-suspended tool call, supplied on resume. */ +export interface ResolvedToolResult { + /** Matches {@link AgentToolCall.id} of the suspended call */ + toolCallId: string; + /** Name of the tool that was called */ + toolName: string; + /** The tool result content (e.g. the interlocutor's reply text) */ + content: string; } export interface AgentLoopState { @@ -86,6 +118,8 @@ export interface AgentLoopState { } export interface AgentLoopResult { + /** Discriminant: the loop ran to completion */ + status: "completed"; /** Final text output from the agent */ text: string; /** All steps taken (tool call + result pairs) */ @@ -99,11 +133,29 @@ export interface AgentLoopResult { totalOutputTokens: number; } +/** + * Returned instead of {@link AgentLoopResult} when the model called one or more + * suspending tools (see {@link AgentLoopConfig.isSuspendingTool}). The caller + * must fulfil every `pendingToolCall`, persist `state`, and later resume by + * calling {@link runAgentLoop} with `resumeState: state` and matching + * `resumeToolResults`. + */ +export interface AgentLoopSuspension { + /** Discriminant: the loop is parked awaiting async tool results */ + status: "suspended"; + /** Tool calls whose results must be supplied before the loop can resume */ + pendingToolCalls: AgentToolCall[]; + /** Loop state to persist and pass back as `resumeState` on resume */ + state: AgentLoopState; +} + +export type AgentLoopOutcome = AgentLoopResult | AgentLoopSuspension; + // ── Core loop ──────────────────────────────────────────────────────────── export async function runAgentLoop( config: AgentLoopConfig -): Promise { +): Promise { const { userMessage, tools, maxSteps, callLLM, onStepComplete } = config; const finalLLM = config.callFinalLLM ?? callLLM; @@ -115,6 +167,17 @@ export async function runAgentLoop( totalOutputTokens: 0, }; + // Resuming a suspended turn: inject the async tool results for the calls that + // parked the loop, completing the in-flight turn before continuing. The + // suspended assistant message and any synchronous tool results are already in + // `state.messages`; we append the resolved results and record the step. + if (config.resumeState && config.resumeToolResults) { + recordSuspendedStep(state, config.resumeToolResults); + if (onStepComplete) { + await onStepComplete(state); + } + } + let finishReason: FinishReason = "completed"; while (state.steps.length < maxSteps) { @@ -154,9 +217,17 @@ export async function runAgentLoop( }; state.messages.push(assistantMessage); - // Execute each tool call + // Execute each tool call. Suspending tools are not run here — their results + // arrive asynchronously, so we collect them and park the loop once the + // synchronous calls in this turn have settled. const toolResults: AgentMessage[] = []; + const pendingToolCalls: AgentToolCall[] = []; for (const toolCall of llmResponse.toolCalls) { + if (config.isSuspendingTool?.(toolCall.name)) { + pendingToolCalls.push(toolCall); + continue; + } + const toolDef = tools.find((t) => t.name === toolCall.name); let resultContent: string; @@ -185,6 +256,14 @@ export async function runAgentLoop( state.messages.push(toolMessage); } + // Park the loop when the turn requested async results. The assistant message + // and any synchronous tool results are already in `state.messages`; the + // caller persists `state`, fulfils the pending calls, and resumes via + // `resumeState` + `resumeToolResults`. The step is recorded on resume. + if (pendingToolCalls.length > 0) { + return { status: "suspended", pendingToolCalls, state }; + } + // Record this step const step: AgentStep = { stepNumber: state.steps.length + 1, @@ -245,6 +324,7 @@ export async function runAgentLoop( const text = lastAssistant?.content ?? ""; return { + status: "completed", text, steps: state.steps, finishReason, @@ -253,3 +333,43 @@ export async function runAgentLoop( totalOutputTokens: state.totalOutputTokens, }; } + +/** + * Completes a suspended turn on resume: appends the resolved tool results to the + * conversation and records the step. The suspended assistant message (with its + * tool calls) and any synchronous tool results from the same turn are already + * the trailing messages in `state.messages`, so the step is reconstructed from + * the tail: the last assistant message followed by every consecutive tool + * message after it. + */ +function recordSuspendedStep( + state: AgentLoopState, + resolved: ResolvedToolResult[] +): void { + for (const r of resolved) { + state.messages.push({ + role: "tool", + content: r.content, + toolCallId: r.toolCallId, + toolName: r.toolName, + }); + } + + let i = state.messages.length - 1; + while (i >= 0 && state.messages[i].role === "tool") { + i--; + } + + const assistantMessage = state.messages[i]; + if (!assistantMessage || assistantMessage.role !== "assistant") { + throw new Error( + "Cannot resume: no suspended assistant turn found in state.messages" + ); + } + + state.steps.push({ + stepNumber: state.steps.length + 1, + assistantMessage, + toolResults: state.messages.slice(i + 1), + }); +} diff --git a/packages/runtime/src/workflow-runtime.ts b/packages/runtime/src/workflow-runtime.ts index 46e02d7e..462be85a 100644 --- a/packages/runtime/src/workflow-runtime.ts +++ b/packages/runtime/src/workflow-runtime.ts @@ -68,8 +68,9 @@ export class WorkflowRuntime extends Runtime { return (await this.currentStep.do( name, WorkflowRuntime.defaultStepConfig, - // @ts-expect-error - TS2345: Cloudflare Workflows requires Serializable types - fn + // Cloudflare Workflows constrains step fns to Serializable returns; our + // runtime values are serializable at runtime but not in the type system. + fn as never )) as T; } @@ -94,8 +95,9 @@ export class WorkflowRuntime extends Runtime { return (await this.currentStep.do( name, WorkflowRuntime.defaultStepConfig, - // @ts-expect-error - TS2345: Cloudflare Workflows requires Serializable types - fn + // Cloudflare Workflows constrains step fns to Serializable returns; our + // runtime values are serializable at runtime but not in the type system. + fn as never )) as T; } @@ -111,11 +113,14 @@ export class WorkflowRuntime extends Runtime { if (!this.currentStep) { throw new Error("waitForNodeEvent called without workflow step context"); } - // @ts-expect-error - TS2344: Cloudflare Workflows requires Serializable constraint - const event = await this.currentStep.waitForEvent(name, { - type: eventType, - timeout, - }); + // Workflows constrains the event payload to Serializable and the timeout to + // the WorkflowSleepDuration template type; cast through a permissive + // signature since our payloads/timeouts are valid at runtime. + const waitForEvent = this.currentStep.waitForEvent as ( + name: string, + opts: { type: string; timeout?: string } + ) => Promise<{ payload: T }>; + const event = await waitForEvent(name, { type: eventType, timeout }); return event.payload as T; } } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 3198e546..c1c20d64 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -94,7 +94,7 @@ importers: devDependencies: '@cloudflare/vitest-pool-workers': specifier: ^0.13.5 - version: 0.13.5(@cloudflare/workers-types@4.20260511.1)(@vitest/runner@4.1.6)(@vitest/snapshot@4.1.6)(bufferutil@4.1.0)(utf-8-validate@6.0.6)(vitest@4.1.6(@opentelemetry/api@1.9.1)(@types/node@22.19.19)(vite@7.3.5(@types/node@22.19.19)(jiti@2.7.0)(lightningcss@1.32.0)(tsx@4.21.0)(yaml@2.9.0))) + version: 0.13.5(@cloudflare/workers-types@4.20260511.1)(@vitest/runner@4.1.9)(@vitest/snapshot@4.1.9)(bufferutil@4.1.0)(utf-8-validate@6.0.6)(vitest@4.1.9(@opentelemetry/api@1.9.1)(@types/node@22.19.19)(vite@7.3.5(@types/node@22.19.19)(jiti@2.7.0)(lightningcss@1.32.0)(tsx@4.21.0)(yaml@2.9.0))) '@cloudflare/workers-types': specifier: ^4.20260511.1 version: 4.20260511.1 @@ -111,8 +111,8 @@ importers: specifier: ^5.9.3 version: 5.9.3 vitest: - specifier: ^4.1.6 - version: 4.1.6(@opentelemetry/api@1.9.1)(@types/node@22.19.19)(vite@7.3.5(@types/node@22.19.19)(jiti@2.7.0)(lightningcss@1.32.0)(tsx@4.21.0)(yaml@2.9.0)) + specifier: ^4.1.9 + version: 4.1.9(@opentelemetry/api@1.9.1)(@types/node@22.19.19)(vite@7.3.5(@types/node@22.19.19)(jiti@2.7.0)(lightningcss@1.32.0)(tsx@4.21.0)(yaml@2.9.0)) wrangler: specifier: ^4.90.1 version: 4.90.1(@cloudflare/workers-types@4.20260511.1)(bufferutil@4.1.0)(utf-8-validate@6.0.6) @@ -298,8 +298,8 @@ importers: specifier: ^7.3.5 version: 7.3.5(@types/node@22.19.19)(jiti@2.7.0)(lightningcss@1.32.0)(tsx@4.21.0)(yaml@2.9.0) vitest: - specifier: ^4.1.6 - version: 4.1.6(@opentelemetry/api@1.9.1)(@types/node@22.19.19)(vite@7.3.5(@types/node@22.19.19)(jiti@2.7.0)(lightningcss@1.32.0)(tsx@4.21.0)(yaml@2.9.0)) + specifier: ^4.1.9 + version: 4.1.9(@opentelemetry/api@1.9.1)(@types/node@22.19.19)(vite@7.3.5(@types/node@22.19.19)(jiti@2.7.0)(lightningcss@1.32.0)(tsx@4.21.0)(yaml@2.9.0)) wrangler: specifier: ^4.90.1 version: 4.90.1(@cloudflare/workers-types@4.20260511.1)(bufferutil@4.1.0)(utf-8-validate@6.0.6) @@ -362,8 +362,8 @@ importers: specifier: ^7.3.5 version: 7.3.5(@types/node@22.19.19)(jiti@2.7.0)(lightningcss@1.32.0)(tsx@4.21.0)(yaml@2.9.0) vitest: - specifier: ^4.1.6 - version: 4.1.6(@opentelemetry/api@1.9.1)(@types/node@22.19.19)(vite@7.3.5(@types/node@22.19.19)(jiti@2.7.0)(lightningcss@1.32.0)(tsx@4.21.0)(yaml@2.9.0)) + specifier: ^4.1.9 + version: 4.1.9(@opentelemetry/api@1.9.1)(@types/node@22.19.19)(vite@7.3.5(@types/node@22.19.19)(jiti@2.7.0)(lightningcss@1.32.0)(tsx@4.21.0)(yaml@2.9.0)) wrangler: specifier: ^4.90.1 version: 4.90.1(@cloudflare/workers-types@4.20260511.1)(bufferutil@4.1.0)(utf-8-validate@6.0.6) @@ -393,8 +393,8 @@ importers: specifier: ^5.9.3 version: 5.9.3 vitest: - specifier: ^4.1.6 - version: 4.1.6(@opentelemetry/api@1.9.1)(@types/node@22.19.19)(vite@7.3.5(@types/node@22.19.19)(jiti@2.7.0)(lightningcss@1.32.0)(tsx@4.21.0)(yaml@2.9.0)) + specifier: ^4.1.9 + version: 4.1.9(@opentelemetry/api@1.9.1)(@types/node@22.19.19)(vite@7.3.5(@types/node@22.19.19)(jiti@2.7.0)(lightningcss@1.32.0)(tsx@4.21.0)(yaml@2.9.0)) packages/runtime: dependencies: @@ -481,8 +481,8 @@ importers: specifier: ^5.9.3 version: 5.9.3 vitest: - specifier: ^4.1.6 - version: 4.1.6(@opentelemetry/api@1.9.1)(@types/node@22.19.19)(vite@7.3.5(@types/node@22.19.19)(jiti@2.7.0)(lightningcss@1.32.0)(tsx@4.21.0)(yaml@2.9.0)) + specifier: ^4.1.9 + version: 4.1.9(@opentelemetry/api@1.9.1)(@types/node@22.19.19)(vite@7.3.5(@types/node@22.19.19)(jiti@2.7.0)(lightningcss@1.32.0)(tsx@4.21.0)(yaml@2.9.0)) packages/types: devDependencies: @@ -3166,11 +3166,11 @@ packages: peerDependencies: vite: ^4.2.0 || ^5.0.0 || ^6.0.0 || ^7.0.0 || ^8.0.0 - '@vitest/expect@4.1.6': - resolution: {integrity: sha512-7EHDquPthALSV0jhhjgEW8FXaviMx7rSqu8W6oqCoAuOhKov814P99QDV1pxMA3QPv21YudvJngIhjrNI4opLg==} + '@vitest/expect@4.1.9': + resolution: {integrity: sha512-vl/rYsUKcBr3SnQn166+XR5ZQcgMx3DQhFWdfli/cWpLnLUmbxZvyrJZotLFUryib+LtArYMSTJ5RbQ57ZqrlA==} - '@vitest/mocker@4.1.6': - resolution: {integrity: sha512-MCFc63czMjEInOlcY2cpQCvCN+KgbAn+60xu9cMgP4sKaLC5JNAKw7JH8QdAnoAC88hW1IiSNZ+GgVXlN1UcMQ==} + '@vitest/mocker@4.1.9': + resolution: {integrity: sha512-EVkXzBjrPGM+cK8/ANWgBrkUCfJfb38/EfTSO8h7pWvKkyPkpWxvR7BkD2MyItMF62C97zAEoqdpUixwR/e+Rw==} peerDependencies: msw: ^2.4.9 vite: ^6.0.0 || ^7.0.0 || ^8.0.0 @@ -3180,20 +3180,20 @@ packages: vite: optional: true - '@vitest/pretty-format@4.1.6': - resolution: {integrity: sha512-h5SxD/IzNhZYnrSZRsUZQIC+vD0GY8cUvq0iwsmkFKixRCKLLWqCXa/FIQ4S1R+sI+PGoojkHsdNrbZiM9Qpgw==} + '@vitest/pretty-format@4.1.9': + resolution: {integrity: sha512-s0iufns3iIFitdgm+YR7g1whCAaGtXz459VS9/PqyKDEEFgYIhsHOQmXgIgDuYCt7DeQmiZT0Qe2OA2p4ZPu5A==} - '@vitest/runner@4.1.6': - resolution: {integrity: sha512-nOPCmn2+yD0ZNmKdsXGv/UxMMWbMuKeD6GyYncNwdkYDxpQvrPSKYj2rWuDjC2Y4b6w6hjip5dBKFzEUuZe3vA==} + '@vitest/runner@4.1.9': + resolution: {integrity: sha512-KXLMDtc7oe70+3mJfGrPUWPesswH+3sTxAMAMl8DG7I8IUQT4XW718dY5ID3vPUcmlu27CcKfY4P3h3I29SLJg==} - '@vitest/snapshot@4.1.6': - resolution: {integrity: sha512-YhsdE6xAVfTDmzjxL2ZDUvjj+ZsgyOKe+TdQzqkD72wIOmHka8NuGQ6NpTNZv9D2Z63fbwWKJPeVpEw4EQgYxw==} + '@vitest/snapshot@4.1.9': + resolution: {integrity: sha512-Jc7RKGNBo8Z28WYIm0Niej4xdSPByRf6mU58VpHQkd6Zh05rlnA+twjbK5HyeIGHxrzsc3mJgS43uM0CZKzaIA==} - '@vitest/spy@4.1.6': - resolution: {integrity: sha512-JFKxMx6udhwKh/Ldo270e17QX710vgunMkuPAvXjHSvC6oqLWAHhVhjg/I71q0u0CBSErIODV1Kjv0FQNSWjdg==} + '@vitest/spy@4.1.9': + resolution: {integrity: sha512-fHpsS6mIi+PiEW+vcRVOMkX1oSaPKne3VOclSFICPcGOmfKgXPU5iAah+wcNcj2xPrCCmfq99IDGf+EojhhvhA==} - '@vitest/utils@4.1.6': - resolution: {integrity: sha512-FxIY+U81R3LGKCxaHHFRQ5+g6/iRgGLmeHWdp2Amj4ljQRrEIWHmZyDfDYBRZlpyqA7qKxtS9DD1dhk8RnRIVQ==} + '@vitest/utils@4.1.9': + resolution: {integrity: sha512-A51o8ymO5PpqlWNnBP9ZHPXDIpuMtTLlGSjN7la4US+LJzoUMyhwjA5QXlm39JexgwHKW4Xjs8Z2d3dLCXOeuA==} '@webgpu/types@0.1.70': resolution: {integrity: sha512-LFiNHHKMvmAEvwVew3JLJmTdShhbdwRFSImUshGhE2mGE8ybQzIo63l5uRp+YKnNx+8Qno8Kf6gN+DKMreIJCA==} @@ -4506,8 +4506,9 @@ packages: resolution: {integrity: sha512-W67iLl4J2EXEGTbfeHCffrjDfitvLANg0UlX3wFUUSTx92KXRFegMHUVgSqE+wvhAbi4WqjGg9czysTV2Epbew==} engines: {node: '>= 0.4'} - obug@2.1.1: - resolution: {integrity: sha512-uTqF9MuPraAQ+IsnPf366RG4cP9RtUi7MLO1N3KEc+wb0a6yKpeL0lmk2IB1jY5KHPAlTc6T/JRdC/YqxHNwkQ==} + obug@2.1.3: + resolution: {integrity: sha512-9miFgM2OFba7hB+pRgvtV84pYTBaoTHohvmIgiRt6dRIzbwEOIaNaP+dIlGs2fNFoB0SeISs0Jz5WFVRid6Xyg==} + engines: {node: '>=12.20.0'} on-finished@2.4.1: resolution: {integrity: sha512-oVlzkg3ENAhCk2zdv7IJwd/QUD4z2RxRwpkcGY8psCVcCYZNq4wYnVWALHM+brtuJjePWiYF/ClmuDr8Ch5+kg==} @@ -5036,14 +5037,18 @@ packages: tinybench@2.9.0: resolution: {integrity: sha512-0+DUvqWMValLmha6lr4kD8iAMK1HzV0/aKnCtWb9v9641TnP/MFb7Pc2bxoxQjTXAErryXVgUOfv2YqNllqGeg==} - tinyexec@1.1.2: - resolution: {integrity: sha512-dAqSqE/RabpBKI8+h26GfLq6Vb3JVXs30XYQjdMjaj/c2tS8IYYMbIzP599KtRj7c57/wYApb3QjgRgXmrCukA==} + tinyexec@1.2.4: + resolution: {integrity: sha512-SHf/r48b7vOrjve9PxJo3MN5v5yuyjHvdUcrQffT3WXMUfnGmHDVbC4k3sHJaJTgZCwpUplIaAo5ANtMyp3YHg==} engines: {node: '>=18'} tinyglobby@0.2.16: resolution: {integrity: sha512-pn99VhoACYR8nFHhxqix+uvsbXineAasWm5ojXoN8xEwK5Kd3/TrhNn1wByuD52UxWRLy8pu+kRMniEi6Eq9Zg==} engines: {node: '>=12.0.0'} + tinyglobby@0.2.17: + resolution: {integrity: sha512-wXR/dYpcqKmfWpEdZjiKJOwCNFndD0DMnrW/cYjVGttEkBfVgcLFHoNrlj47mjOVic9yyNu65alsgF4NQyTa2g==} + engines: {node: '>=12.0.0'} + tinyqueue@2.0.3: resolution: {integrity: sha512-ppJZNDuKGgxzkHihX8v9v9G5f+18gzaTfrukGrq6ueg0lmH4nqVnA2IPG0AEH3jKEk2GRJCUhDoqpoiw3PHLBA==} @@ -5247,20 +5252,20 @@ packages: yaml: optional: true - vitest@4.1.6: - resolution: {integrity: sha512-6lvjbS3p9b4CrdCmguzbh2/4uoXhGE2q71R4OX5sqF9R1bo9Xd6fGrMAfvp5wnCzlBnFVdCOp6onuTQVbo8iUQ==} + vitest@4.1.9: + resolution: {integrity: sha512-nE3/LEyc0z87uHYLZebqCUOaJr2hdtuPp7BQ4BosVFnfltxgAvMG08NyrSGlPpOUWvR27c5flSmYFTNr78L9GQ==} engines: {node: ^20.0.0 || ^22.0.0 || >=24.0.0} hasBin: true peerDependencies: '@edge-runtime/vm': '*' '@opentelemetry/api': ^1.9.0 '@types/node': ^20.0.0 || ^22.0.0 || >=24.0.0 - '@vitest/browser-playwright': 4.1.6 - '@vitest/browser-preview': 4.1.6 - '@vitest/browser-webdriverio': 4.1.6 - '@vitest/coverage-istanbul': 4.1.6 - '@vitest/coverage-v8': 4.1.6 - '@vitest/ui': 4.1.6 + '@vitest/browser-playwright': 4.1.9 + '@vitest/browser-preview': 4.1.9 + '@vitest/browser-webdriverio': 4.1.9 + '@vitest/coverage-istanbul': 4.1.9 + '@vitest/coverage-v8': 4.1.9 + '@vitest/ui': 4.1.9 happy-dom: '*' jsdom: '*' vite: ^6.0.0 || ^7.0.0 || ^8.0.0 @@ -5775,14 +5780,14 @@ snapshots: - utf-8-validate - workerd - '@cloudflare/vitest-pool-workers@0.13.5(@cloudflare/workers-types@4.20260511.1)(@vitest/runner@4.1.6)(@vitest/snapshot@4.1.6)(bufferutil@4.1.0)(utf-8-validate@6.0.6)(vitest@4.1.6(@opentelemetry/api@1.9.1)(@types/node@22.19.19)(vite@7.3.5(@types/node@22.19.19)(jiti@2.7.0)(lightningcss@1.32.0)(tsx@4.21.0)(yaml@2.9.0)))': + '@cloudflare/vitest-pool-workers@0.13.5(@cloudflare/workers-types@4.20260511.1)(@vitest/runner@4.1.9)(@vitest/snapshot@4.1.9)(bufferutil@4.1.0)(utf-8-validate@6.0.6)(vitest@4.1.9(@opentelemetry/api@1.9.1)(@types/node@22.19.19)(vite@7.3.5(@types/node@22.19.19)(jiti@2.7.0)(lightningcss@1.32.0)(tsx@4.21.0)(yaml@2.9.0)))': dependencies: - '@vitest/runner': 4.1.6 - '@vitest/snapshot': 4.1.6 + '@vitest/runner': 4.1.9 + '@vitest/snapshot': 4.1.9 cjs-module-lexer: 1.4.3 esbuild: 0.28.1 miniflare: 4.20260317.3(bufferutil@4.1.0)(utf-8-validate@6.0.6) - vitest: 4.1.6(@opentelemetry/api@1.9.1)(@types/node@22.19.19)(vite@7.3.5(@types/node@22.19.19)(jiti@2.7.0)(lightningcss@1.32.0)(tsx@4.21.0)(yaml@2.9.0)) + vitest: 4.1.9(@opentelemetry/api@1.9.1)(@types/node@22.19.19)(vite@7.3.5(@types/node@22.19.19)(jiti@2.7.0)(lightningcss@1.32.0)(tsx@4.21.0)(yaml@2.9.0)) wrangler: 4.78.0(@cloudflare/workers-types@4.20260511.1)(bufferutil@4.1.0)(utf-8-validate@6.0.6) zod: 3.25.76 transitivePeerDependencies: @@ -8654,44 +8659,44 @@ snapshots: transitivePeerDependencies: - supports-color - '@vitest/expect@4.1.6': + '@vitest/expect@4.1.9': dependencies: '@standard-schema/spec': 1.1.0 '@types/chai': 5.2.3 - '@vitest/spy': 4.1.6 - '@vitest/utils': 4.1.6 + '@vitest/spy': 4.1.9 + '@vitest/utils': 4.1.9 chai: 6.2.2 tinyrainbow: 3.1.0 - '@vitest/mocker@4.1.6(vite@7.3.5(@types/node@22.19.19)(jiti@2.7.0)(lightningcss@1.32.0)(tsx@4.21.0)(yaml@2.9.0))': + '@vitest/mocker@4.1.9(vite@7.3.5(@types/node@22.19.19)(jiti@2.7.0)(lightningcss@1.32.0)(tsx@4.21.0)(yaml@2.9.0))': dependencies: - '@vitest/spy': 4.1.6 + '@vitest/spy': 4.1.9 estree-walker: 3.0.3 magic-string: 0.30.21 optionalDependencies: vite: 7.3.5(@types/node@22.19.19)(jiti@2.7.0)(lightningcss@1.32.0)(tsx@4.21.0)(yaml@2.9.0) - '@vitest/pretty-format@4.1.6': + '@vitest/pretty-format@4.1.9': dependencies: tinyrainbow: 3.1.0 - '@vitest/runner@4.1.6': + '@vitest/runner@4.1.9': dependencies: - '@vitest/utils': 4.1.6 + '@vitest/utils': 4.1.9 pathe: 2.0.3 - '@vitest/snapshot@4.1.6': + '@vitest/snapshot@4.1.9': dependencies: - '@vitest/pretty-format': 4.1.6 - '@vitest/utils': 4.1.6 + '@vitest/pretty-format': 4.1.9 + '@vitest/utils': 4.1.9 magic-string: 0.30.21 pathe: 2.0.3 - '@vitest/spy@4.1.6': {} + '@vitest/spy@4.1.9': {} - '@vitest/utils@4.1.6': + '@vitest/utils@4.1.9': dependencies: - '@vitest/pretty-format': 4.1.6 + '@vitest/pretty-format': 4.1.9 convert-source-map: 2.0.0 tinyrainbow: 3.1.0 @@ -10074,7 +10079,7 @@ snapshots: object-inspect@1.13.4: {} - obug@2.1.1: {} + obug@2.1.3: {} on-finished@2.4.1: dependencies: @@ -10708,13 +10713,18 @@ snapshots: tinybench@2.9.0: {} - tinyexec@1.1.2: {} + tinyexec@1.2.4: {} tinyglobby@0.2.16: dependencies: fdir: 6.5.0(picomatch@4.0.4) picomatch: 4.0.4 + tinyglobby@0.2.17: + dependencies: + fdir: 6.5.0(picomatch@4.0.4) + picomatch: 4.0.4 + tinyqueue@2.0.3: {} tinyrainbow@3.1.0: {} @@ -10922,25 +10932,25 @@ snapshots: tsx: 4.21.0 yaml: 2.9.0 - vitest@4.1.6(@opentelemetry/api@1.9.1)(@types/node@22.19.19)(vite@7.3.5(@types/node@22.19.19)(jiti@2.7.0)(lightningcss@1.32.0)(tsx@4.21.0)(yaml@2.9.0)): + vitest@4.1.9(@opentelemetry/api@1.9.1)(@types/node@22.19.19)(vite@7.3.5(@types/node@22.19.19)(jiti@2.7.0)(lightningcss@1.32.0)(tsx@4.21.0)(yaml@2.9.0)): dependencies: - '@vitest/expect': 4.1.6 - '@vitest/mocker': 4.1.6(vite@7.3.5(@types/node@22.19.19)(jiti@2.7.0)(lightningcss@1.32.0)(tsx@4.21.0)(yaml@2.9.0)) - '@vitest/pretty-format': 4.1.6 - '@vitest/runner': 4.1.6 - '@vitest/snapshot': 4.1.6 - '@vitest/spy': 4.1.6 - '@vitest/utils': 4.1.6 + '@vitest/expect': 4.1.9 + '@vitest/mocker': 4.1.9(vite@7.3.5(@types/node@22.19.19)(jiti@2.7.0)(lightningcss@1.32.0)(tsx@4.21.0)(yaml@2.9.0)) + '@vitest/pretty-format': 4.1.9 + '@vitest/runner': 4.1.9 + '@vitest/snapshot': 4.1.9 + '@vitest/spy': 4.1.9 + '@vitest/utils': 4.1.9 es-module-lexer: 2.1.0 expect-type: 1.3.0 magic-string: 0.30.21 - obug: 2.1.1 + obug: 2.1.3 pathe: 2.0.3 picomatch: 4.0.4 std-env: 4.1.0 tinybench: 2.9.0 - tinyexec: 1.1.2 - tinyglobby: 0.2.16 + tinyexec: 1.2.4 + tinyglobby: 0.2.17 tinyrainbow: 3.1.0 vite: 7.3.5(@types/node@22.19.19)(jiti@2.7.0)(lightningcss@1.32.0)(tsx@4.21.0)(yaml@2.9.0) why-is-node-running: 2.3.0