Skip to content

Commit 0120577

Browse files
bchapuisclaude
andcommitted
Add schema input to agent nodes for structured JSON output
Extends the agent loop with an optional callFinalLLM callback that applies schema constraints on the final output-producing LLM call. Uses prompt-based JSON schema enforcement for Anthropic and Workers AI providers, native response_format for OpenAI and Gemini. Also fixes Workers AI LLM nodes to use prompt-based approach instead of unsupported response_format. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 1266c03 commit 0120577

4 files changed

Lines changed: 164 additions & 23 deletions

File tree

apps/api/src/durable-objects/agent-runner.ts

Lines changed: 104 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,10 @@ import {
2525
getOpenAIConfig,
2626
} from "@dafthunk/runtime/utils/ai-gateway";
2727
import { createCodeModeToolDefinition } from "@dafthunk/runtime/utils/code-mode";
28+
import { schemaToJsonSchema } from "@dafthunk/runtime/utils/schema-to-json-schema";
2829
import type { TokenPricing } from "@dafthunk/runtime/utils/usage";
2930
import { calculateTokenUsage } from "@dafthunk/runtime/utils/usage";
31+
import type { Schema } from "@dafthunk/types";
3032
import { GoogleGenAI } from "@google/genai";
3133
import { Agent } from "agents";
3234
import OpenAI from "openai";
@@ -74,6 +76,8 @@ export interface AgentRunRequest {
7476
agentId?: string;
7577
/** Max number of previous messages to load from conversation history */
7678
maxHistory?: number;
79+
/** Schema to constrain the final output format (structured JSON output) */
80+
schema?: Record<string, unknown>;
7781
}
7882

7983
export interface AgentRunResponse {
@@ -221,6 +225,28 @@ export class AgentRunner extends Agent<Bindings, AgentRunnerState> {
221225
body.maxHistory ?? 50
222226
);
223227

228+
// Convert schema if provided
229+
const jsonSchema =
230+
body.schema &&
231+
typeof body.schema === "object" &&
232+
"fields" in body.schema
233+
? schemaToJsonSchema(body.schema as unknown as Schema)
234+
: undefined;
235+
236+
// Build callFinalLLM that applies schema constraint on the final turn
237+
const callFinalLLM = jsonSchema
238+
? (messages: AgentMessage[], tools: ToolDefinition[]) =>
239+
this.callLLM(
240+
body.provider,
241+
body.model,
242+
body.instructions,
243+
messages,
244+
tools,
245+
geminiBuiltInTools,
246+
jsonSchema
247+
)
248+
: undefined;
249+
224250
// Run the agent loop
225251
const result = await runAgentLoop({
226252
userMessage,
@@ -235,6 +261,7 @@ export class AgentRunner extends Agent<Bindings, AgentRunnerState> {
235261
tools,
236262
geminiBuiltInTools
237263
),
264+
callFinalLLM,
238265
onStepComplete: async (state) => {
239266
this.ctx.storage.sql.exec(
240267
`UPDATE agent_runs SET state = ?, updated_at = datetime('now') WHERE run_id = ?`,
@@ -388,6 +415,28 @@ export class AgentRunner extends Agent<Bindings, AgentRunnerState> {
388415
body.maxHistory ?? 50
389416
);
390417

418+
// Convert schema if provided
419+
const jsonSchema =
420+
body.schema &&
421+
typeof body.schema === "object" &&
422+
"fields" in body.schema
423+
? schemaToJsonSchema(body.schema as unknown as Schema)
424+
: undefined;
425+
426+
// Build callFinalLLM that applies schema constraint on the final turn
427+
const callFinalLLM = jsonSchema
428+
? (messages: AgentMessage[], tools: ToolDefinition[]) =>
429+
this.callLLM(
430+
body.provider,
431+
body.model,
432+
body.instructions,
433+
messages,
434+
tools,
435+
geminiBuiltInTools,
436+
jsonSchema
437+
)
438+
: undefined;
439+
391440
const result = await runAgentLoop({
392441
userMessage,
393442
tools: toolDefinitions,
@@ -401,6 +450,7 @@ export class AgentRunner extends Agent<Bindings, AgentRunnerState> {
401450
tools,
402451
geminiBuiltInTools
403452
),
453+
callFinalLLM,
404454
onStepComplete: async (state) => {
405455
this.ctx.storage.sql.exec(
406456
`UPDATE agent_runs SET state = ?, updated_at = datetime('now') WHERE run_id = ?`,
@@ -639,23 +689,31 @@ export class AgentRunner extends Agent<Bindings, AgentRunnerState> {
639689
instructions: string,
640690
messages: AgentMessage[],
641691
tools: ToolDefinition[],
642-
builtInTools?: Record<string, unknown>[]
692+
builtInTools?: Record<string, unknown>[],
693+
schema?: Record<string, unknown>
643694
): Promise<LLMResponse> {
644695
switch (provider) {
645696
case "anthropic":
646-
return this.callAnthropic(model, instructions, messages, tools);
697+
return this.callAnthropic(model, instructions, messages, tools, schema);
647698
case "google":
648699
return this.callGoogle(
649700
model,
650701
instructions,
651702
messages,
652703
tools,
653-
builtInTools
704+
builtInTools,
705+
schema
654706
);
655707
case "openai":
656-
return this.callOpenAI(model, instructions, messages, tools);
708+
return this.callOpenAI(model, instructions, messages, tools, schema);
657709
case "workers-ai":
658-
return this.callWorkersAI(model, instructions, messages, tools);
710+
return this.callWorkersAI(
711+
model,
712+
instructions,
713+
messages,
714+
tools,
715+
schema
716+
);
659717
default:
660718
throw new Error(`Unsupported provider: ${provider}`);
661719
}
@@ -667,7 +725,8 @@ export class AgentRunner extends Agent<Bindings, AgentRunnerState> {
667725
model: string,
668726
instructions: string,
669727
messages: AgentMessage[],
670-
tools: ToolDefinition[]
728+
tools: ToolDefinition[],
729+
schema?: Record<string, unknown>
671730
): Promise<LLMResponse> {
672731
const client = new Anthropic({
673732
apiKey: "gateway-managed",
@@ -716,11 +775,16 @@ export class AgentRunner extends Agent<Bindings, AgentRunnerState> {
716775
input_schema: t.parameters as Anthropic.Tool.InputSchema,
717776
}));
718777

778+
// When schema is provided, append a JSON constraint to the system prompt
779+
const systemPrompt = schema
780+
? `${instructions}\n\nYou MUST respond with valid JSON matching this schema:\n${JSON.stringify(schema)}`
781+
: instructions;
782+
719783
const response = await client.messages.create({
720784
model,
721785
max_tokens: 4096,
722786
messages: anthropicMessages,
723-
...(instructions && { system: instructions }),
787+
...(systemPrompt && { system: systemPrompt }),
724788
...(anthropicTools.length > 0 && { tools: anthropicTools }),
725789
});
726790

@@ -755,7 +819,8 @@ export class AgentRunner extends Agent<Bindings, AgentRunnerState> {
755819
instructions: string,
756820
messages: AgentMessage[],
757821
tools: ToolDefinition[],
758-
builtInTools?: Record<string, unknown>[]
822+
builtInTools?: Record<string, unknown>[],
823+
schema?: Record<string, unknown>
759824
): Promise<LLMResponse> {
760825
const ai = new GoogleGenAI({
761826
apiKey: "gateway-managed",
@@ -815,6 +880,12 @@ export class AgentRunner extends Agent<Bindings, AgentRunnerState> {
815880
config.tools = allTools;
816881
}
817882

883+
// Apply schema constraint for structured JSON output
884+
if (schema) {
885+
config.responseMimeType = "application/json";
886+
config.responseSchema = schema;
887+
}
888+
818889
const response = await ai.models.generateContent({
819890
model,
820891
contents: contents as any,
@@ -859,7 +930,8 @@ export class AgentRunner extends Agent<Bindings, AgentRunnerState> {
859930
model: string,
860931
instructions: string,
861932
messages: AgentMessage[],
862-
tools: ToolDefinition[]
933+
tools: ToolDefinition[],
934+
schema?: Record<string, unknown>
863935
): Promise<LLMResponse> {
864936
const client = new OpenAI({
865937
apiKey: "gateway-managed",
@@ -911,11 +983,24 @@ export class AgentRunner extends Agent<Bindings, AgentRunnerState> {
911983
},
912984
}));
913985

986+
// Build response_format when a schema is provided
987+
const responseFormat = schema
988+
? {
989+
type: "json_schema" as const,
990+
json_schema: {
991+
name: "response",
992+
schema,
993+
strict: true,
994+
},
995+
}
996+
: undefined;
997+
914998
const completion = await client.chat.completions.create({
915999
model,
9161000
max_tokens: 4096,
9171001
messages: openaiMessages,
9181002
...(openaiTools.length > 0 && { tools: openaiTools }),
1003+
...(responseFormat && { response_format: responseFormat }),
9191004
});
9201005

9211006
const choice = completion.choices[0];
@@ -948,13 +1033,20 @@ export class AgentRunner extends Agent<Bindings, AgentRunnerState> {
9481033
model: string,
9491034
_instructions: string,
9501035
messages: AgentMessage[],
951-
tools: ToolDefinition[]
1036+
tools: ToolDefinition[],
1037+
schema?: Record<string, unknown>
9521038
): Promise<LLMResponse> {
9531039
// Workers AI uses OpenAI-compatible chat format
9541040
const aiMessages: Array<{ role: string; content: string }> = [];
9551041

956-
if (_instructions) {
957-
aiMessages.push({ role: "system", content: _instructions });
1042+
// When schema is provided, append a JSON constraint to the system prompt
1043+
// (Workers AI models don't reliably support response_format)
1044+
const systemPrompt = schema
1045+
? `${_instructions}\n\nYou MUST respond with valid JSON matching this schema:\n${JSON.stringify(schema)}`
1046+
: _instructions;
1047+
1048+
if (systemPrompt) {
1049+
aiMessages.push({ role: "system", content: systemPrompt });
9581050
}
9591051

9601052
for (const m of messages) {

packages/runtime/src/nodes/agent/base-agent-node.ts

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,12 @@ const AGENT_INPUTS: NodeType["inputs"] = [
7777
hidden: true,
7878
value: false,
7979
},
80+
{
81+
name: "schema",
82+
type: "schema",
83+
description: "JSON schema to constrain the final output format",
84+
hidden: true,
85+
},
8086
];
8187

8288
/** Extra inputs for Gemini agent nodes — Google built-in tools */
@@ -95,8 +101,8 @@ export const GEMINI_BUILTIN_TOOL_INPUTS: NodeType["inputs"] = [
95101
const AGENT_OUTPUTS: NodeType["outputs"] = [
96102
{
97103
name: "text",
98-
type: "string",
99-
description: "Final text response from the agent",
104+
type: "any",
105+
description: "Final text or JSON response from the agent",
100106
},
101107
{
102108
name: "steps",
@@ -227,6 +233,7 @@ export abstract class BaseAgentNode extends ExecutableNode {
227233
code_mode,
228234
googleSearch,
229235
agent_id,
236+
schema,
230237
} = context.inputs;
231238
const agentContext = context.inputs.context as string | undefined;
232239
const agentId = agent_id as string | undefined;
@@ -267,6 +274,7 @@ export abstract class BaseAgentNode extends ExecutableNode {
267274
googleSearch: googleSearch ?? false,
268275
organizationId: context.organizationId,
269276
agentId,
277+
...(schema && { schema }),
270278
}),
271279
});
272280

@@ -316,6 +324,7 @@ export abstract class BaseAgentNode extends ExecutableNode {
316324
code_mode,
317325
googleSearch,
318326
agent_id,
327+
schema,
319328
} = context.inputs;
320329
const agentContext = context.inputs.context as string | undefined;
321330
const agentId = agent_id as string | undefined;
@@ -353,6 +362,7 @@ export abstract class BaseAgentNode extends ExecutableNode {
353362
googleSearch: googleSearch ?? false,
354363
organizationId: context.organizationId,
355364
agentId,
365+
...(schema && { schema }),
356366
}),
357367
});
358368

packages/runtime/src/nodes/text/execute-workers-ai-text-model.ts

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,16 +55,23 @@ export async function executeWorkersAiTextModel(
5555
context
5656
);
5757

58-
// Build response_format when a schema is provided
58+
// When schema is provided, prepend a JSON constraint to messages
59+
// (Workers AI models don't reliably support response_format)
5960
const extraParams: Record<string, unknown> = { ...(config.params ?? {}) };
60-
if (schemaInput && typeof schemaInput === "object" && "fields" in schemaInput) {
61-
extraParams.response_format = {
62-
type: "json_schema",
63-
json_schema: {
64-
name: "response",
65-
schema: schemaToJsonSchema(schemaInput as Schema),
61+
if (
62+
schemaInput &&
63+
typeof schemaInput === "object" &&
64+
"fields" in schemaInput &&
65+
parsedMessages
66+
) {
67+
const jsonSchema = schemaToJsonSchema(schemaInput as Schema);
68+
parsedMessages = [
69+
{
70+
role: "system",
71+
content: `You MUST respond with valid JSON matching this schema:\n${JSON.stringify(jsonSchema)}`,
6672
},
67-
};
73+
...parsedMessages,
74+
];
6875
}
6976

7077
let result: any;

packages/runtime/src/utils/agent-loop.ts

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,16 @@ export interface AgentLoopConfig {
6161
tools: ToolDefinition[]
6262
) => Promise<LLMResponse>;
6363

64+
/**
65+
* Optional LLM call used for the final output-producing turn.
66+
* When provided (e.g. to enforce a JSON schema), this replaces `callLLM`
67+
* for the last call that generates the user-facing response.
68+
*/
69+
callFinalLLM?: (
70+
messages: AgentMessage[],
71+
tools: ToolDefinition[]
72+
) => Promise<LLMResponse>;
73+
6474
/** Called after each iteration so the caller can persist state */
6575
onStepComplete?: (state: AgentLoopState) => Promise<void>;
6676

@@ -95,6 +105,7 @@ export async function runAgentLoop(
95105
config: AgentLoopConfig
96106
): Promise<AgentLoopResult> {
97107
const { userMessage, tools, maxSteps, callLLM, onStepComplete } = config;
108+
const finalLLM = config.callFinalLLM ?? callLLM;
98109

99110
// Initialise or resume state
100111
const state: AgentLoopState = config.resumeState ?? {
@@ -188,11 +199,32 @@ export async function runAgentLoop(
188199
}
189200
}
190201

202+
// If the model completed normally and a callFinalLLM is provided, make one
203+
// additional call with schema constraints to produce structured output.
204+
// We pop the last assistant message so the model generates a fresh response
205+
// with the schema constraint, rather than seeing its own unformatted reply.
206+
if (finishReason === "completed" && config.callFinalLLM) {
207+
state.messages.pop();
208+
209+
const formatResponse = await finalLLM(state.messages, []);
210+
state.totalInputTokens += formatResponse.inputTokens;
211+
state.totalOutputTokens += formatResponse.outputTokens;
212+
213+
state.messages.push({
214+
role: "assistant",
215+
content: formatResponse.content,
216+
});
217+
218+
if (onStepComplete) {
219+
await onStepComplete(state);
220+
}
221+
}
222+
191223
// If we exhausted maxSteps, do one final LLM call without tools to summarise
192224
if (state.steps.length >= maxSteps) {
193225
finishReason = "max_steps_reached";
194226

195-
const finalResponse = await callLLM(state.messages, []);
227+
const finalResponse = await finalLLM(state.messages, []);
196228
state.totalInputTokens += finalResponse.inputTokens;
197229
state.totalOutputTokens += finalResponse.outputTokens;
198230

0 commit comments

Comments
 (0)