Skip to content

Commit 0a95db2

Browse files
bug fixes, docs updates
- AI Data Processing Node new features - bug fixes - docs updates
1 parent a85133d commit 0a95db2

16 files changed

Lines changed: 399 additions & 300 deletions

File tree

client/src/components/nodes/LlmProcessNode/hooks/useAIProcessor.ts

Lines changed: 129 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@ import {GenericNodeData} from '../../../../types/workflow';
99
import {Message, MessageRole, SystemUserConfigValues, ToolSchema} from '../../../../types/ollama.types';
1010
import {LlmProcessNodeData} from '../types/workflow';
1111
import {useFetchModels} from '../../../../hooks/useFetchModels';
12-
import {runOrchestration, runSingleCall} from '../utils/aiOrchestration';
13-
import {assertIsSerializedInput, getExpectedOutputType, parseFormat, serializeInput} from '../utils/formatUtils';
12+
import Ajv from 'ajv';
13+
import {runOrchestration, runSingleCall, JSON_SCHEMA_PROMPT_PREFIX} from '../utils/aiOrchestration';
14+
import {assertIsSerializedInput, getExpectedOutputType, llmResponseJsonParse, parseFormat, serializeInput} from '../utils/formatUtils';
1415

1516

1617
export interface UseAIProcessorOptions {
@@ -108,13 +109,37 @@ export function useAIProcessor (options: UseAIProcessorOptions = {}) {
108109

109110
try {
110111
let messages: Message[] = [];
112+
let parsedFormat: object | undefined;
113+
let onErrorValidator: ((data: any) => boolean) | undefined;
114+
115+
try {
116+
({parsedFormat, onErrorValidator} = parseFormat(format));
117+
} catch (parseError) {
118+
const errorMsg = `Invalid format JSON: ${parseError instanceof Error ? parseError.message : String(parseError)}`;
119+
120+
setError(prev => [...prev, errorMsg]);
121+
onError?.(errorMsg);
122+
123+
return null;
124+
}
125+
126+
const enhancedSystemPrompt = parsedFormat
127+
? `${prompt ? `${prompt}\n\n` : ""}${JSON_SCHEMA_PROMPT_PREFIX}\n${JSON.stringify(parsedFormat, null, 4)}`
128+
: prompt;
129+
130+
// Two-phase mode: when tools AND structured output are both present some LLMs
131+
// fail to call tools if a structured output format is also requested.
132+
// Phase 1 runs the tool-calling pass with only the plain prompt;
133+
// Phase 2 reformats the raw reply into the required JSON schema without tools.
134+
const needsTwoPhase = !!(parsedFormat && tools?.length);
135+
const phase1SystemPrompt = needsTwoPhase ? prompt : enhancedSystemPrompt;
111136

112137
if (conversationHistory.value.length === 0) {
113138
// First run - initialize conversation
114-
if (prompt) {
139+
if (phase1SystemPrompt) {
115140
messages.push({
116141
role: MessageRole.SYSTEM,
117-
content: prompt
142+
content: phase1SystemPrompt
118143
});
119144
}
120145

@@ -187,52 +212,120 @@ export function useAIProcessor (options: UseAIProcessorOptions = {}) {
187212
}
188213
}
189214

190-
let parsedFormat: object | undefined;
191-
let onErrorValidator: ((data: any) => boolean) | undefined;
215+
let result: any;
192216

193-
try {
194-
({parsedFormat, onErrorValidator} = parseFormat(format));
195-
} catch (parseError) {
196-
const errorMsg = `Invalid format JSON: ${parseError instanceof Error ? parseError.message : String(parseError)}`;
217+
if (needsTwoPhase) {
218+
// Phase 1: tool-calling pass — no structured output format so tools fire correctly
219+
const phase1Response = await runSingleCall({
220+
messages,
221+
model,
222+
tools,
223+
maxToolRetries,
224+
think,
225+
temperature
226+
});
197227

198-
setError(prev => [...prev, errorMsg]);
199-
onError?.(errorMsg);
228+
if (!phase1Response.success) {
229+
const errorMsg = `Failed to fetch AI response (phase 1): ${phase1Response.error}`;
200230

201-
return null;
202-
}
231+
setError(prev => [...prev, errorMsg]);
232+
onError?.(errorMsg);
203233

204-
const response = await runSingleCall({
205-
messages,
206-
model,
207-
...(parsedFormat ? {format: parsedFormat} : {}),
208-
tools,
209-
maxToolRetries,
210-
think,
211-
temperature
212-
});
234+
return null;
235+
}
213236

214-
if (!response.success) {
215-
const errorMsg = `Failed to fetch AI response: ${response.error}`;
237+
const phase1Reply = phase1Response.reply;
216238

217-
setError(prev => [...prev, errorMsg]);
218-
onError?.(errorMsg);
239+
// Short-circuit: if Phase 1 already produced valid structured output, skip Phase 2
240+
let phase2Reply: string | undefined;
219241

220-
return null;
221-
}
242+
try {
243+
const ajv = new Ajv();
244+
const validate = ajv.compile(parsedFormat!);
245+
const phase1Parsed = llmResponseJsonParse(phase1Reply);
246+
247+
if (validate(phase1Parsed)) {
248+
phase2Reply = phase1Reply;
249+
}
250+
} catch {
251+
// Not valid JSON or schema mismatch — proceed to Phase 2
252+
}
253+
254+
if (phase2Reply === undefined) {
255+
// Phase 2: reformat pass — structured output, no tools
256+
const phase2Messages: Message[] = [
257+
{
258+
role: MessageRole.SYSTEM,
259+
content: enhancedSystemPrompt ?? `${JSON_SCHEMA_PROMPT_PREFIX}\n${JSON.stringify(parsedFormat, null, 4)}`
260+
},
261+
{
262+
role: MessageRole.USER,
263+
content: `Here is the response to reformat:\n\n${phase1Reply}\n\nPlease reformat it to strictly match the required JSON schema.`
264+
}
265+
];
266+
267+
const phase2Response = await runSingleCall({
268+
messages: phase2Messages,
269+
model,
270+
format: parsedFormat,
271+
maxToolRetries,
272+
think,
273+
temperature
274+
});
275+
276+
if (!phase2Response.success) {
277+
const errorMsg = `Failed to fetch AI response (phase 2): ${phase2Response.error}`;
222278

223-
let result = response.reply;
224-
const updatedHistory = [...messages, {
225-
role: MessageRole.ASSISTANT,
226-
content: result
227-
}];
279+
setError(prev => [...prev, errorMsg]);
280+
onError?.(errorMsg);
281+
282+
return null;
283+
}
284+
285+
phase2Reply = phase2Response.reply;
286+
}
228287

229-
conversationHistory.onChange(updatedHistory);
288+
result = phase2Reply;
289+
290+
// Store Phase 1 messages + final structured reply in history
291+
conversationHistory.onChange([...messages, {
292+
role: MessageRole.ASSISTANT,
293+
content: result
294+
}]);
295+
} else {
296+
// Single-call path: no tools, or no structured output — no conflict
297+
const response = await runSingleCall({
298+
messages,
299+
model,
300+
...(parsedFormat ? {format: parsedFormat} : {}),
301+
tools,
302+
maxToolRetries,
303+
think,
304+
temperature
305+
});
306+
307+
if (!response.success) {
308+
const errorMsg = `Failed to fetch AI response: ${response.error}`;
309+
310+
setError(prev => [...prev, errorMsg]);
311+
onError?.(errorMsg);
312+
313+
return null;
314+
}
315+
316+
result = response.reply;
317+
318+
conversationHistory.onChange([...messages, {
319+
role: MessageRole.ASSISTANT,
320+
content: result
321+
}]);
322+
}
230323

231324
try {
232325
const expectedOutputType = getExpectedOutputType(parsedFormat);
233326

234327
if (expectedOutputType === "object" || expectedOutputType === "array") {
235-
result = JSON.parse(result);
328+
result = llmResponseJsonParse(result);
236329

237330
if (expectedOutputType === "object" && onErrorValidator && onErrorValidator(result)) {
238331
const errorMsg = `LLM returned an error response matching onError schema:\n${JSON.stringify(result, null, 4)}`;
@@ -252,7 +345,6 @@ export function useAIProcessor (options: UseAIProcessorOptions = {}) {
252345
return null;
253346
}
254347

255-
256348
onSuccess?.(result);
257349

258350
return result;

client/src/components/nodes/LlmProcessNode/utils/aiOrchestration/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,4 @@ export * from "./types";
88

99
export {runSingleCall} from "./runSingleCall";
1010

11-
export {runOrchestration} from "./runOrchestration";
11+
export {runOrchestration, JSON_SCHEMA_PROMPT_PREFIX} from "./runOrchestration";

client/src/components/nodes/LlmProcessNode/utils/aiOrchestration/runOrchestration.ts

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
************************************************************************/
66

77
import {Message, MessageRole} from "../../../../../types/ollama.types";
8-
import {getExpectedOutputType, parseFormat} from "../formatUtils";
8+
import {getExpectedOutputType, llmResponseJsonParse, parseFormat} from "../formatUtils";
99
import {AgentTaskResult, OrchestrationParams, OrchestrationResult, OrchestratorPlan} from "./types";
1010
import {buildAggregationMessage, buildDependencyContext} from "./messageBuilders";
1111
import {runSingleCall} from "./runSingleCall";
@@ -40,6 +40,8 @@ const ORCHESTRATOR_PLAN_SCHEMA = {
4040
required: ["tasks"]
4141
};
4242

43+
export const JSON_SCHEMA_PROMPT_PREFIX = 'Your response must be valid JSON matching the following schema exactly:';
44+
4345
/**
4446
* When the input is an array:
4547
* 1. Planning call — orchestrator (using the node's system prompt) decomposes
@@ -57,12 +59,16 @@ export async function runOrchestration (params: OrchestrationParams): Promise<Or
5759
// ------------------------------------------------------------------
5860
// 1. Planning call
5961
// ------------------------------------------------------------------
62+
const planningSystemPrompt = prompt
63+
? `${prompt}\n\n${JSON_SCHEMA_PROMPT_PREFIX}\n${JSON.stringify(ORCHESTRATOR_PLAN_SCHEMA, null, 4)}`
64+
: `${JSON_SCHEMA_PROMPT_PREFIX}\n${JSON.stringify(ORCHESTRATOR_PLAN_SCHEMA, null, 4)}`;
65+
6066
const planningMessages: Message[] = [
61-
...(prompt ? [{role: MessageRole.SYSTEM, content: prompt}] : []),
67+
{role: MessageRole.SYSTEM, content: planningSystemPrompt},
6268
{
6369
role: MessageRole.USER,
6470
// eslint-disable-next-line max-len
65-
content: `Analyze the following input and decompose it into a list of individual tasks to be processed by separate agents. For each task provide: the content to process, an optional custom systemPrompt if the task requires specialized instructions, a tools array with the exact names of any tools that task needs (leave empty if none), and a dependsOn array with the zero-based indices of any earlier tasks whose output this task requires as input (only backward references allowed — a task can only reference tasks that appear before it in the list; omit if independent).${buildToolsDescription(tools)}\n\nInput:\n${JSON.stringify(input, null, 2)}`
71+
content: `Analyze the following input and decompose it into a list of individual tasks to be processed by separate agents. For each task provide: the content to process, an optional custom systemPrompt if the task requires specialized instructions, a tools array with the exact names of any tools that task needs (leave empty if none), and a dependsOn array with the zero-based indices of any earlier tasks whose output this task requires as input (only backward references allowed — a task can only reference tasks that appear before it in the list; omit if independent).${buildToolsDescription(tools)}\n\nInput:\n${JSON.stringify(input, null, 4)}`
6672
}
6773
];
6874

@@ -92,7 +98,7 @@ export async function runOrchestration (params: OrchestrationParams): Promise<Or
9298
let plan: OrchestratorPlan;
9399

94100
try {
95-
plan = JSON.parse(planningResponse.reply) as OrchestratorPlan;
101+
plan = llmResponseJsonParse(planningResponse.reply) as OrchestratorPlan;
96102

97103
const totalTasks = plan.tasks.length;
98104

@@ -208,11 +214,6 @@ export async function runOrchestration (params: OrchestrationParams): Promise<Or
208214
console.log("Aggregating", taskResults.length, "result(s):", taskResults.map(r => ({task: r.task.content, response: r.response})));
209215
/* #endif */
210216

211-
const aggregationMessages: Message[] = [
212-
...(prompt ? [{role: MessageRole.SYSTEM, content: prompt}] : []),
213-
{role: MessageRole.USER, content: buildAggregationMessage(taskResults)}
214-
];
215-
216217
let parsedFormat: object | undefined;
217218
let onErrorValidator: ((data: any) => boolean) | undefined;
218219

@@ -228,6 +229,18 @@ export async function runOrchestration (params: OrchestrationParams): Promise<Or
228229
};
229230
}
230231

232+
const aggregationSystemPrompt = parsedFormat
233+
? `${ prompt
234+
? `${prompt}\n\n`
235+
: "" }${JSON_SCHEMA_PROMPT_PREFIX}\n${JSON.stringify(parsedFormat, null, 4)}`
236+
: prompt;
237+
238+
239+
const aggregationMessages: Message[] = [
240+
...(aggregationSystemPrompt ? [{role: MessageRole.SYSTEM, content: aggregationSystemPrompt}] : []),
241+
{role: MessageRole.USER, content: buildAggregationMessage(taskResults)}
242+
];
243+
231244
// Aggregation has all task results in-context — no tools needed.
232245
const aggregationResponse = await runSingleCall({
233246
messages: aggregationMessages,
@@ -251,7 +264,7 @@ export async function runOrchestration (params: OrchestrationParams): Promise<Or
251264
const expectedOutputType = getExpectedOutputType(parsedFormat);
252265

253266
if (expectedOutputType === "object" || expectedOutputType === "array") {
254-
result = JSON.parse(result);
267+
result = llmResponseJsonParse(result);
255268

256269
if (expectedOutputType === "object" && onErrorValidator && onErrorValidator(result)) {
257270
return {

client/src/components/nodes/LlmProcessNode/utils/formatUtils.ts

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
************************************************************************/
66

77
import Ajv from "ajv";
8+
import {sanitizeJsonInput} from "../../ToolNode/tools/utils/sanitize";
89

910

1011
// ---------------------------------------------------------------------------
@@ -111,6 +112,55 @@ export function parseFormat (
111112
return {parsedFormat, onErrorValidator};
112113
}
113114

115+
/**
116+
* Parses LLM response text into JSON, unwrapping optional markdown code fences.
117+
*
118+
* Supported input forms:
119+
* - raw JSON
120+
* - ```json\n{...}\n```
121+
* - ```\n{...}\n```
122+
* - fenced arrays
123+
* - leading/trailing whitespace
124+
*/
125+
export function llmResponseJsonParse<T = any> (reply: string): T {
126+
if (typeof reply !== "string") {
127+
throw new TypeError(
128+
`Expected string response, received ${typeof reply}`
129+
);
130+
}
131+
132+
let jsonText = reply.trim();
133+
134+
// Remove UTF-8 BOM if present
135+
jsonText = jsonText.replace(/^\uFEFF/, "");
136+
137+
// Remove opening fence
138+
jsonText = jsonText.replace(/^```(?:json)?\s*/i, "");
139+
140+
// Remove closing fence
141+
jsonText = jsonText.replace(/\s*```$/, "");
142+
143+
jsonText = jsonText.trim();
144+
145+
try {
146+
return JSON.parse(jsonText) as T;
147+
}
148+
catch (error) {
149+
try {
150+
return sanitizeJsonInput(jsonText) as T;
151+
}
152+
catch (sanitizeError) {
153+
throw new Error(
154+
`Failed to parse JSON: ${
155+
error instanceof Error ? error.message : String(error)
156+
}; sanitization fallback failed: ${
157+
sanitizeError instanceof Error ? sanitizeError.message : String(sanitizeError)
158+
}`
159+
);
160+
}
161+
}
162+
}
163+
114164

115165
// ---------------------------------------------------------------------------
116166
// Input serialization

0 commit comments

Comments
 (0)