Skip to content

Commit 70fc89d

Browse files
committed
Refactor pilot server to utilize agent-based architecture for workflow execution and tool management. Update event handling, improve error handling, and enhance workflow-related functions. Adjust SQL queries for better performance and consistency. Update README and other documentation for clarity.
1 parent 0687dd8 commit 70fc89d

19 files changed

Lines changed: 1127 additions & 278 deletions

mcp-studio/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ The `@ref` syntax wires data between steps:
7373

7474
#### Examples
7575

76-
```json
76+
```jsonc
7777
// Direct reference - entire value
7878
{ "user": "@fetch_user" }
7979

mcp-studio/server/main.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,7 @@ const runtime = withRuntime<Env, typeof StateSchema, Registry>({
2626
events: [...WORKFLOW_EVENTS] as string[],
2727
handler: async ({ events }, env) => {
2828
try {
29-
console.log("handling events", events);
3029
handleWorkflowEvents(events, env as unknown as Env);
31-
console.log("events handled");
3230
return { success: true };
3331
} catch (error) {
3432
console.error(`[MAIN] Error handling events: ${error}`);

mcp-studio/server/stdio-tools.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -985,7 +985,7 @@ export async function registerStdioTools(server: McpServer): Promise<void> {
985985
);
986986

987987
const stepResults = await runSQL<Record<string, unknown>>(
988-
"SELECT * FROM workflow_step_result WHERE execution_id = ? ORDER BY created_at ASC",
988+
"SELECT * FROM workflow_execution_step_result WHERE execution_id = ? ORDER BY started_at_epoch_ms ASC",
989989
[args.id],
990990
);
991991

@@ -1097,8 +1097,8 @@ export async function registerStdioTools(server: McpServer): Promise<void> {
10971097
"https://assets.decocache.com/decocms/fd07a578-6b1c-40f1-bc05-88a3b981695d/f7fc4ffa81aec04e37ae670c3cd4936643a7b269.png";
10981098

10991099
await runSQL(
1100-
`INSERT INTO assistants (id, title, description, avatar, system_prompt, gateway_id, model, created_at, updated_at, created_by, updated_by)
1101-
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
1100+
`INSERT INTO assistants (id, title, description, avatar, system_prompt, gateway_id, model, instructions, created_at, updated_at, created_by, updated_by)
1101+
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
11021102
[
11031103
id,
11041104
args.data.title,
@@ -1107,6 +1107,7 @@ export async function registerStdioTools(server: McpServer): Promise<void> {
11071107
args.data.system_prompt || "",
11081108
args.data.gateway_id || "",
11091109
JSON.stringify(args.data.model || { id: "", connectionId: "" }),
1110+
"", // instructions - default to empty string
11101111
now,
11111112
now,
11121113
"stdio-user",

mcp-studio/server/tools/workflow.ts

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -131,28 +131,32 @@ export const createListTool = (env: Env) =>
131131
`;
132132

133133
const itemsResult: any =
134-
await env.MESH_REQUEST_CONTEXT?.state?.DATABASE.DATABASES_RUN_SQL({
134+
await env.MESH_REQUEST_CONTEXT?.state?.DATABASE?.DATABASES_RUN_SQL({
135135
sql,
136136
params: [...params, limit, offset],
137137
});
138138

139+
if (!itemsResult?.result?.[0]?.results) {
140+
throw new Error("Database query failed or returned invalid result");
141+
}
142+
139143
const countQuery = `SELECT COUNT(*) as count FROM workflow_collection ${whereClause}`;
140144
const countResult =
141-
await env.MESH_REQUEST_CONTEXT?.state?.DATABASE.DATABASES_RUN_SQL({
145+
await env.MESH_REQUEST_CONTEXT?.state?.DATABASE?.DATABASES_RUN_SQL({
142146
sql: countQuery,
143147
params,
144148
});
145149
const dbTotalCount = parseInt(
146-
(countResult.result[0]?.results?.[0] as { count: string })?.count ||
150+
(countResult?.result?.[0]?.results?.[0] as { count: string })?.count ||
147151
"0",
148152
10,
149153
);
150154

151155
// Get DB workflows
152-
const dbWorkflows: WorkflowWithMeta[] =
153-
itemsResult.result[0]?.results?.map((item: Record<string, unknown>) =>
156+
const dbWorkflows: WorkflowWithMeta[] = itemsResult.result[0].results.map(
157+
(item: Record<string, unknown>) =>
154158
transformDbRowToWorkflowCollectionItem(item),
155-
) || [];
159+
);
156160

157161
// Get file-based workflows (always included, marked readonly)
158162
const fileWorkflows = getFileWorkflows();
@@ -183,11 +187,11 @@ export async function getWorkflowCollection(
183187
): Promise<WorkflowWithMeta | null> {
184188
// First check DB
185189
const result =
186-
await env.MESH_REQUEST_CONTEXT?.state?.DATABASE.DATABASES_RUN_SQL({
190+
await env.MESH_REQUEST_CONTEXT?.state?.DATABASE?.DATABASES_RUN_SQL({
187191
sql: "SELECT * FROM workflow_collection WHERE id = ? LIMIT 1",
188192
params: [id],
189193
});
190-
const item = result.result[0]?.results?.[0] || null;
194+
const item = result?.result?.[0]?.results?.[0] || null;
191195

192196
if (item) {
193197
return transformDbRowToWorkflowCollectionItem(
@@ -294,7 +298,7 @@ Example workflow with 2 parallel steps:
294298
295299
Example workflow with a step that references the output of another step:
296300
{ "title": "Get first user and then fetch orders", "steps": [
297-
true }, "transformCode": "export default async (i) => i[0]" },
301+
{ "name": "fetch_users", "action": { "toolName": "GET_USERS" }, "input": { "all": true }, "transformCode": "export default async (i) => i[0]" },
298302
{ "name": "fetch_orders", "action": { "toolName": "GET_ORDERS" }, "input": { "user": "@fetch_users.user" } },
299303
]}
300304
`,
@@ -395,18 +399,18 @@ async function updateWorkflowCollection(
395399
`;
396400

397401
const result =
398-
await env.MESH_REQUEST_CONTEXT?.state?.DATABASE.DATABASES_RUN_SQL({
402+
await env.MESH_REQUEST_CONTEXT?.state?.DATABASE?.DATABASES_RUN_SQL({
399403
sql,
400404
params,
401405
});
402406

403-
if (result.result[0]?.results?.length === 0) {
407+
if (!result?.result?.[0]?.results || result.result[0].results.length === 0) {
404408
throw new Error(`Workflow collection with id ${id} not found`);
405409
}
406410

407411
return {
408412
item: transformDbRowToWorkflowCollectionItem(
409-
result.result[0]?.results?.[0] as Record<string, unknown>,
413+
result.result[0].results[0] as Record<string, unknown>,
410414
),
411415
};
412416
}

mcp-studio/server/workflow-loader.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,15 @@ export function isFilesystemMode(): boolean {
8383
* Parse a workflow file and extract workflow(s)
8484
*/
8585
async function parseWorkflowFile(filePath: string): Promise<LoadedWorkflow[]> {
86-
const content = await readFile(filePath, "utf-8");
86+
let content: string;
87+
try {
88+
content = await readFile(filePath, "utf-8");
89+
} catch (error) {
90+
// Handle file access errors (deleted, permission denied, etc.) gracefully
91+
console.error(`[workflow-loader] Failed to read ${filePath}:`, error);
92+
return [];
93+
}
94+
8795
let parsed: unknown;
8896

8997
try {
@@ -254,7 +262,7 @@ export function getWorkflowById(id: string): LoadedWorkflow | undefined {
254262
* Start watching for file changes
255263
*/
256264
export async function startWatching(
257-
options: WorkflowLoaderOptions,
265+
options?: WorkflowLoaderOptions,
258266
): Promise<void> {
259267
const source = options || getWorkflowSource();
260268

pilot/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ bun run check
282282
## See Also
283283

284284
- [Architecture](docs/ARCHITECTURE.md) - Detailed architecture overview
285-
- [Mesh Bridge](../../mesh-bridge) - Browser interface for Pilot
285+
- [Mesh Bridge](../mesh-bridge) - Browser interface for Pilot
286286
- [MCP Mesh](https://github.com/decolabs/mesh) - The mesh platform
287287

288288
## License

pilot/server/agent.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,8 @@ export class PilotAgent {
116116
temperature: 0.7,
117117
maxRouterIterations: 10,
118118
maxExecutorIterations: 30,
119-
smartModel: config.smartModel || config.fastModel,
120119
...config,
120+
smartModel: config.smartModel ?? config.fastModel,
121121
};
122122
this.ctx = ctx;
123123
this.localTools = getAllLocalTools();

pilot/server/core/execution-adapter.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -190,10 +190,12 @@ export async function findContinuableThread(
190190
if (exec.status !== "success") continue;
191191

192192
// Must be within TTL
193-
if (exec.completed_at_epoch_ms) {
194-
const age = now - exec.completed_at_epoch_ms;
195-
if (age > ttlMs) continue;
193+
if (!exec.completed_at_epoch_ms) {
194+
// Missing completion timestamp - skip (incomplete execution)
195+
continue;
196196
}
197+
const age = now - exec.completed_at_epoch_ms;
198+
if (age > ttlMs) continue;
197199

198200
// Check metadata
199201
const input = exec.input || {};

pilot/server/core/llm-executor.ts

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import {
1212
resolveRefs,
1313
groupStepsByLevel,
1414
} from "../types/workflow.ts";
15-
import { loadWorkflow, listWorkflows } from "./workflow-studio-adapter.ts";
1615
import { getAllLocalTools } from "../tools/index.ts";
1716

1817
// ============================================================================
@@ -52,6 +51,8 @@ export interface ExecutorConfig {
5251
fastModel: string;
5352
smartModel: string;
5453
onProgress?: (stepName: string, message: string) => void;
54+
loadWorkflow?: (workflowId: string) => Promise<Workflow | null>;
55+
listWorkflows?: () => Promise<Workflow[]>;
5556
}
5657

5758
export interface ExecutionContext {
@@ -64,6 +65,8 @@ export interface ExecutionContext {
6465
listConnections: ListConnectionsCallback;
6566
publishEvent?: (type: string, data: Record<string, unknown>) => Promise<void>;
6667
toolCache: Map<string, ToolDefinition>;
68+
loadWorkflow?: (workflowId: string) => Promise<Workflow | null>;
69+
listWorkflows?: () => Promise<Workflow[]>;
6770
}
6871

6972
export interface WorkflowResult {
@@ -92,9 +95,17 @@ export async function runWorkflow(
9295
type: string,
9396
data: Record<string, unknown>,
9497
) => Promise<void>;
98+
loadWorkflow?: (workflowId: string) => Promise<Workflow | null>;
9599
},
96100
): Promise<WorkflowResult> {
97-
const workflow = await loadWorkflow(workflowId);
101+
if (!options.loadWorkflow) {
102+
return {
103+
success: false,
104+
error: "loadWorkflow callback not provided",
105+
};
106+
}
107+
108+
const workflow = await options.loadWorkflow(workflowId);
98109
if (!workflow) {
99110
return { success: false, error: `Workflow not found: ${workflowId}` };
100111
}
@@ -117,6 +128,8 @@ export async function runWorkflowDirect(
117128
type: string,
118129
data: Record<string, unknown>,
119130
) => Promise<void>;
131+
loadWorkflow?: (workflowId: string) => Promise<Workflow | null>;
132+
listWorkflows?: () => Promise<Workflow[]>;
120133
},
121134
): Promise<WorkflowResult> {
122135
const ctx: ExecutionContext = {
@@ -129,6 +142,8 @@ export async function runWorkflowDirect(
129142
listConnections: options.listConnections,
130143
publishEvent: options.publishEvent,
131144
toolCache: new Map(),
145+
loadWorkflow: options.loadWorkflow,
146+
listWorkflows: options.listWorkflows,
132147
};
133148

134149
try {
@@ -266,6 +281,10 @@ async function executeCodeStep(
266281

267282
const code = step.action.code;
268283

284+
// SECURITY WARNING: Using new Function() is equivalent to eval() and provides
285+
// full Node.js access without sandboxing. This should only be used with trusted workflows.
286+
// Consider using a sandboxed execution environment (vm2, isolated-vm, or worker threads)
287+
// for untrusted code execution in the future.
269288
try {
270289
const fn = new Function(
271290
"input",
@@ -585,7 +604,10 @@ async function executeToolCall(
585604
): Promise<unknown> {
586605
// Meta tools
587606
if (toolName === "list_workflows") {
588-
const workflows = await listWorkflows();
607+
if (!ctx.listWorkflows) {
608+
throw new Error("listWorkflows callback not provided");
609+
}
610+
const workflows = await ctx.listWorkflows();
589611
return {
590612
workflows: workflows.map((w) => ({
591613
id: w.id,
@@ -596,6 +618,9 @@ async function executeToolCall(
596618
}
597619

598620
if (toolName === "start_workflow") {
621+
if (!ctx.loadWorkflow) {
622+
throw new Error("loadWorkflow callback not provided");
623+
}
599624
const workflowId = args.workflowId as string;
600625
const workflowInput = (args.input as Record<string, unknown>) || {};
601626
const result = await runWorkflow(workflowId, workflowInput, {
@@ -604,6 +629,7 @@ async function executeToolCall(
604629
callMeshTool: ctx.callMeshTool,
605630
listConnections: ctx.listConnections,
606631
publishEvent: ctx.publishEvent,
632+
loadWorkflow: ctx.loadWorkflow,
607633
});
608634
return result;
609635
}

pilot/server/core/workflow-studio-adapter.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,17 @@ export async function deleteWorkflow(workflowId: string): Promise<boolean> {
200200
const client = requireStudioClient();
201201

202202
try {
203+
// Check if workflow exists and is readonly (file-based)
204+
const existingResult = (await client.callTool("COLLECTION_WORKFLOW_GET", {
205+
id: workflowId,
206+
})) as { item?: { readonly?: boolean } };
207+
208+
if (existingResult.item?.readonly) {
209+
throw new Error(
210+
`Cannot delete "${workflowId}" - it's a file-based workflow. Delete the JSON file directly.`,
211+
);
212+
}
213+
203214
await client.callTool("COLLECTION_WORKFLOW_DELETE", {
204215
id: workflowId,
205216
});

0 commit comments

Comments
 (0)