Skip to content

Commit 9234877

Browse files
bchapuisclaude
andcommitted
Add stateful agent conversations via Cloudflare Agents SDK
Migrate AgentRunner from raw DurableObject to the Agents SDK Agent base class, enabling persistent conversation history across workflow runs. When an agent_id input is set, the same DO instance is reused so prior messages are carried forward; without it, behavior is unchanged. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent b7e6966 commit 9234877

5 files changed

Lines changed: 9341 additions & 3397 deletions

File tree

apps/api/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
"@mapbox/martini": "^0.2.0",
5555
"@turf/helpers": "^7.3.2",
5656
"@turf/turf": "^7.3.2",
57+
"agents": "^0.1.6",
5758
"aws4fetch": "^1.0.20",
5859
"cloudflare": "^5.2.0",
5960
"cron-parser": "^5.4.0",

apps/api/src/durable-objects/agent-runner.ts

Lines changed: 97 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,13 @@
88
* Supports four LLM providers: anthropic, google, openai, workers-ai.
99
*/
1010

11-
import { DurableObject } from "cloudflare:workers";
1211
import Anthropic from "@anthropic-ai/sdk";
1312
import type { ToolDefinition, ToolReference } from "@dafthunk/runtime";
1413
import { NodeToolProvider } from "@dafthunk/runtime";
1514
import type { AgentProvider } from "@dafthunk/runtime/nodes/agent/base-agent-node";
1615
import type {
1716
AgentLoopResult,
17+
AgentLoopState,
1818
AgentMessage,
1919
LLMResponse,
2020
} from "@dafthunk/runtime/utils/agent-loop";
@@ -28,6 +28,7 @@ import { createCodeModeToolDefinition } from "@dafthunk/runtime/utils/code-mode"
2828
import type { TokenPricing } from "@dafthunk/runtime/utils/usage";
2929
import { calculateTokenUsage } from "@dafthunk/runtime/utils/usage";
3030
import { GoogleGenAI } from "@google/genai";
31+
import { Agent } from "agents";
3132
import OpenAI from "openai";
3233

3334
import type { Bindings } from "../context";
@@ -69,6 +70,8 @@ export interface AgentRunRequest {
6970
googleSearch?: boolean;
7071
/** Organization ID for credential access (integrations, secrets) */
7172
organizationId: string;
73+
/** When set, routes to a persistent DO that maintains state across runs */
74+
agentId?: string;
7275
}
7376

7477
export interface AgentRunResponse {
@@ -78,11 +81,21 @@ export interface AgentRunResponse {
7881
totalSteps: number;
7982
totalInputTokens: number;
8083
totalOutputTokens: number;
84+
/** Full message history (only present when agentId is set) */
85+
agentMessages?: AgentMessage[];
86+
}
87+
88+
// ── Persistent state for stateful conversations ─────────────────────────
89+
90+
export interface AgentRunnerState {
91+
messages: AgentMessage[];
92+
totalInputTokens: number;
93+
totalOutputTokens: number;
8194
}
8295

8396
// ── Durable Object ───────────────────────────────────────────────────────
8497

85-
export class AgentRunner extends DurableObject<Bindings> {
98+
export class AgentRunner extends Agent<Bindings, AgentRunnerState> {
8699
private initialized = false;
87100

88101
constructor(ctx: DurableObjectState, env: Bindings) {
@@ -199,6 +212,9 @@ export class AgentRunner extends DurableObject<Bindings> {
199212
// Build built-in Gemini tools (only effective for google provider)
200213
const geminiBuiltInTools = this.buildGeminiBuiltInTools(body);
201214

215+
// Build resume state from persisted conversation (if stateful)
216+
const resumeState = this.buildResumeState(body.agentId, userMessage);
217+
202218
// Run the agent loop
203219
const result = await runAgentLoop({
204220
userMessage,
@@ -220,15 +236,24 @@ export class AgentRunner extends DurableObject<Bindings> {
220236
runId
221237
);
222238
},
239+
resumeState,
223240
});
224241

242+
// Persist conversation state for stateful sessions
243+
const agentMessages = this.persistConversationState(
244+
body.agentId,
245+
userMessage,
246+
result
247+
);
248+
225249
const response: AgentRunResponse = {
226250
text: result.text,
227251
steps: result.steps,
228252
finishReason: result.finishReason,
229253
totalSteps: result.totalSteps,
230254
totalInputTokens: result.totalInputTokens,
231255
totalOutputTokens: result.totalOutputTokens,
256+
...(agentMessages && { agentMessages }),
232257
};
233258

234259
// Cache the completed result
@@ -350,6 +375,9 @@ export class AgentRunner extends DurableObject<Bindings> {
350375

351376
const geminiBuiltInTools = this.buildGeminiBuiltInTools(body);
352377

378+
// Build resume state from persisted conversation (if stateful)
379+
const resumeState = this.buildResumeState(body.agentId, userMessage);
380+
353381
const result = await runAgentLoop({
354382
userMessage,
355383
tools: toolDefinitions,
@@ -370,15 +398,24 @@ export class AgentRunner extends DurableObject<Bindings> {
370398
runId
371399
);
372400
},
401+
resumeState,
373402
});
374403

404+
// Persist conversation state for stateful sessions
405+
const agentMessages = this.persistConversationState(
406+
body.agentId,
407+
userMessage,
408+
result
409+
);
410+
375411
const response: AgentRunResponse = {
376412
text: result.text,
377413
steps: result.steps,
378414
finishReason: result.finishReason,
379415
totalSteps: result.totalSteps,
380416
totalInputTokens: result.totalInputTokens,
381417
totalOutputTokens: result.totalOutputTokens,
418+
...(agentMessages && { agentMessages }),
382419
};
383420

384421
// Cache the completed result
@@ -456,12 +493,70 @@ export class AgentRunner extends DurableObject<Bindings> {
456493
totalInputTokens: response.totalInputTokens,
457494
totalOutputTokens: response.totalOutputTokens,
458495
},
496+
...(response.agentMessages && {
497+
agent_messages: response.agentMessages,
498+
}),
459499
},
460500
usage,
461501
},
462502
});
463503
}
464504

505+
// ── Conversation state helpers ───────────────────────────────────────
506+
507+
/**
508+
* If a agentId is set and previous messages exist, builds a
509+
* resumeState so the agent loop continues from the prior conversation.
510+
*/
511+
private buildResumeState(
512+
agentId: string | undefined,
513+
userMessage: string
514+
): AgentLoopState | undefined {
515+
if (!agentId) return undefined;
516+
const prev = this.state?.messages;
517+
if (!prev || prev.length === 0) return undefined;
518+
519+
return {
520+
messages: [...prev, { role: "user" as const, content: userMessage }],
521+
steps: [],
522+
totalInputTokens: 0,
523+
totalOutputTokens: 0,
524+
};
525+
}
526+
527+
/**
528+
* After an agent run completes, persists the full conversation history
529+
* when agentId is set. Returns the conversation messages for
530+
* inclusion in the response, or undefined for ephemeral runs.
531+
*/
532+
private persistConversationState(
533+
agentId: string | undefined,
534+
userMessage: string,
535+
result: AgentLoopResult
536+
): AgentMessage[] | undefined {
537+
if (!agentId) return undefined;
538+
539+
const prevMessages = this.state?.messages ?? [];
540+
const newMessages: AgentMessage[] = [
541+
{ role: "user", content: userMessage },
542+
];
543+
for (const step of result.steps) {
544+
newMessages.push(step.assistantMessage);
545+
newMessages.push(...step.toolResults);
546+
}
547+
newMessages.push({ role: "assistant", content: result.text });
548+
549+
const allMessages = [...prevMessages, ...newMessages];
550+
this.setState({
551+
messages: allMessages,
552+
totalInputTokens:
553+
(this.state?.totalInputTokens ?? 0) + result.totalInputTokens,
554+
totalOutputTokens:
555+
(this.state?.totalOutputTokens ?? 0) + result.totalOutputTokens,
556+
});
557+
return allMessages;
558+
}
559+
465560
// ── Built-in Gemini tools ─────────────────────────────────────────────
466561

467562
private buildGeminiBuiltInTools(

apps/api/src/types/agents.d.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/**
2+
* Type declarations for the Cloudflare Agents SDK (`agents` package).
3+
* Provides the Agent base class for Durable Objects with built-in state management.
4+
*/
5+
declare module "agents" {
6+
import { DurableObject } from "cloudflare:workers";
7+
import type { DurableObjectState } from "cloudflare:workers";
8+
9+
/**
10+
* Agent base class that extends DurableObject with persistent state management.
11+
* Use `this.state` to read and `this.setState()` to persist state across requests.
12+
*
13+
* Note: Agent extends partyserver's Server which adds WebSocket/room routing
14+
* in `fetch()`. Override `fetch()` directly if you only need state management
15+
* and want to bypass partyserver's header requirements.
16+
*/
17+
export class Agent<Env, State> extends DurableObject<Env> {
18+
constructor(ctx: DurableObjectState, env: Env);
19+
20+
/** Current persistent state (undefined until first setState call) */
21+
get state(): State | undefined;
22+
23+
/** Persist new state (replaces previous state) */
24+
setState(state: State): void;
25+
}
26+
}

packages/runtime/src/nodes/agent/base-agent-node.ts

Lines changed: 48 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,13 @@ export interface AgentNodeConfig {
1818

1919
/** Standard inputs shared by all agent nodes */
2020
const AGENT_INPUTS: NodeType["inputs"] = [
21+
{
22+
name: "agent_id",
23+
type: "string",
24+
description:
25+
"When set, routes to a persistent agent that maintains state and conversation history across runs",
26+
required: false,
27+
},
2128
{
2229
name: "instructions",
2330
type: "string",
@@ -106,6 +113,12 @@ const AGENT_OUTPUTS: NodeType["outputs"] = [
106113
description: "Token usage and cost information",
107114
hidden: true,
108115
},
116+
{
117+
name: "agent_messages",
118+
type: "json",
119+
description: "Full message history (only present when agent_id is set)",
120+
hidden: true,
121+
},
109122
];
110123

111124
/**
@@ -145,6 +158,7 @@ interface AgentRunResponse {
145158
totalSteps: number;
146159
totalInputTokens: number;
147160
totalOutputTokens: number;
161+
agentMessages?: unknown[];
148162
}
149163

150164
/** Event payload sent by AgentRunner DO when async execution completes */
@@ -155,6 +169,7 @@ export interface AgentCompleteEvent {
155169
total_steps: number;
156170
finish_reason: string;
157171
usage_metadata: { totalInputTokens: number; totalOutputTokens: number };
172+
agent_messages?: unknown[];
158173
};
159174
usage: number;
160175
error?: string;
@@ -192,9 +207,17 @@ export abstract class BaseAgentNode extends ExecutableNode {
192207
config: AgentNodeConfig
193208
): Promise<NodeExecution> {
194209
try {
195-
const { instructions, input, max_steps, tools, code_mode, googleSearch } =
196-
context.inputs;
210+
const {
211+
instructions,
212+
input,
213+
max_steps,
214+
tools,
215+
code_mode,
216+
googleSearch,
217+
agent_id,
218+
} = context.inputs;
197219
const agentContext = context.inputs.context as string | undefined;
220+
const agentId = agent_id as string | undefined;
198221

199222
if (!input) {
200223
return this.createErrorResult("Input is required");
@@ -206,7 +229,10 @@ export abstract class BaseAgentNode extends ExecutableNode {
206229

207230
const agentRunner = context.env.AGENT_RUNNER as DurableObjectNamespace;
208231
const runId = `${context.executionId}:${context.nodeId}`;
209-
const doId = agentRunner.idFromName(runId);
232+
// Stateful: route to a persistent DO keyed by agentId
233+
// Ephemeral: route to a per-execution DO (current behavior)
234+
const doName = agentId ? `agent:${agentId}` : runId;
235+
const doId = agentRunner.idFromName(doName);
210236
const stub = agentRunner.get(doId);
211237

212238
const response = await stub.fetch("https://agent/start", {
@@ -227,6 +253,7 @@ export abstract class BaseAgentNode extends ExecutableNode {
227253
codeMode: code_mode ?? false,
228254
googleSearch: googleSearch ?? false,
229255
organizationId: context.organizationId,
256+
agentId,
230257
}),
231258
});
232259

@@ -267,9 +294,17 @@ export abstract class BaseAgentNode extends ExecutableNode {
267294
config: AgentNodeConfig
268295
): Promise<NodeExecution> {
269296
try {
270-
const { instructions, input, max_steps, tools, code_mode, googleSearch } =
271-
context.inputs;
297+
const {
298+
instructions,
299+
input,
300+
max_steps,
301+
tools,
302+
code_mode,
303+
googleSearch,
304+
agent_id,
305+
} = context.inputs;
272306
const agentContext = context.inputs.context as string | undefined;
307+
const agentId = agent_id as string | undefined;
273308

274309
if (!input) {
275310
return this.createErrorResult("Input is required");
@@ -281,7 +316,10 @@ export abstract class BaseAgentNode extends ExecutableNode {
281316

282317
const agentRunner = context.env.AGENT_RUNNER as DurableObjectNamespace;
283318
const runId = `${context.executionId ?? context.workflowId}:${context.nodeId}`;
284-
const doId = agentRunner.idFromName(runId);
319+
// Stateful: route to a persistent DO keyed by agentId
320+
// Ephemeral: route to a per-execution DO (current behavior)
321+
const doName = agentId ? `agent:${agentId}` : runId;
322+
const doId = agentRunner.idFromName(doName);
285323
const stub = agentRunner.get(doId);
286324

287325
const response = await stub.fetch("https://agent/run", {
@@ -299,6 +337,7 @@ export abstract class BaseAgentNode extends ExecutableNode {
299337
codeMode: code_mode ?? false,
300338
googleSearch: googleSearch ?? false,
301339
organizationId: context.organizationId,
340+
agentId,
302341
}),
303342
});
304343

@@ -330,6 +369,9 @@ export abstract class BaseAgentNode extends ExecutableNode {
330369
totalInputTokens: result.totalInputTokens,
331370
totalOutputTokens: result.totalOutputTokens,
332371
},
372+
...(result.agentMessages && {
373+
agent_messages: result.agentMessages,
374+
}),
333375
},
334376
usage
335377
);

0 commit comments

Comments
 (0)