Skip to content

Commit 808ef9a

Browse files
author
Roman Melnikov
committed
feat(mcp-server): configurable async concurrency controller (#238)
1 parent 435e72b commit 808ef9a

15 files changed

Lines changed: 791 additions & 224 deletions

File tree

bun.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/cli/package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@ageflow/cli",
3-
"version": "0.6.0",
3+
"version": "0.7.0",
44
"description": "CLI for ageflow \u2014 agentwf run / validate / dry-run / init",
55
"homepage": "https://github.com/Neftedollar/ageflow/tree/master/packages/cli",
66
"type": "module",
@@ -26,7 +26,7 @@
2626
"@ageflow/executor": "^0.7.0",
2727
"@ageflow/learning": "^0.5.0",
2828
"@ageflow/learning-sqlite": "^0.4.1",
29-
"@ageflow/mcp-server": "^0.7.0",
29+
"@ageflow/mcp-server": "^0.8.0",
3030
"@ageflow/server": "^0.6.0",
3131
"@ageflow/server-sqlite": "^0.2.0",
3232
"chalk": "^5.3.0",

packages/cli/src/__tests__/mcp-serve.test.ts

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,31 @@ describe("parseMcpServeArgs", () => {
121121
expect(args.maxTurns).toBeNull();
122122
});
123123

124+
it("parses --max-concurrent-jobs", () => {
125+
const args = parseMcpServeArgs([
126+
"wf.ts",
127+
"--async",
128+
"--max-concurrent-jobs",
129+
"3",
130+
]);
131+
expect(args.maxConcurrentJobs).toBe(3);
132+
});
133+
134+
it("defaults maxConcurrentJobs to undefined", () => {
135+
const args = parseMcpServeArgs(["wf.ts"]);
136+
expect(args.maxConcurrentJobs).toBeUndefined();
137+
});
138+
139+
it("parses --max-concurrent-jobs-per-workflow", () => {
140+
const args = parseMcpServeArgs([
141+
"wf.ts",
142+
"--async",
143+
"--max-concurrent-jobs-per-workflow",
144+
"4",
145+
]);
146+
expect(args.maxConcurrentJobsPerWorkflow).toBe(4);
147+
});
148+
124149
it("parses --hitl auto", () => {
125150
const args = parseMcpServeArgs(["wf.ts", "--hitl", "auto"]);
126151
expect(args.hitlStrategy).toBe("auto");
@@ -178,6 +203,38 @@ describe("parseMcpServeArgs", () => {
178203
);
179204
});
180205

206+
it("throws on invalid --max-concurrent-jobs value", () => {
207+
expect(() =>
208+
parseMcpServeArgs(["wf.ts", "--max-concurrent-jobs", "0"]),
209+
).toThrow(/positive integer/);
210+
});
211+
212+
it("throws on negative --max-concurrent-jobs value", () => {
213+
expect(() =>
214+
parseMcpServeArgs(["wf.ts", "--max-concurrent-jobs", "-2"]),
215+
).toThrow(/positive integer/);
216+
});
217+
218+
it("throws on invalid --max-concurrent-jobs-per-workflow value", () => {
219+
expect(() =>
220+
parseMcpServeArgs([
221+
"wf.ts",
222+
"--async",
223+
"--max-concurrent-jobs-per-workflow",
224+
"1.5",
225+
]),
226+
).toThrow(/positive integer/);
227+
});
228+
229+
it("requires --async for concurrency flags", () => {
230+
expect(() =>
231+
parseMcpServeArgs(["wf.ts", "--max-concurrent-jobs", "2"]),
232+
).toThrow(/requires --async/);
233+
expect(() =>
234+
parseMcpServeArgs(["wf.ts", "--max-concurrent-jobs-per-workflow", "2"]),
235+
).toThrow(/requires --async/);
236+
});
237+
181238
it("throws on unknown flag", () => {
182239
expect(() => parseMcpServeArgs(["wf.ts", "--unknown-flag"])).toThrow(
183240
/Unknown flag/,

packages/cli/src/commands/mcp-serve.ts

Lines changed: 71 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
* --no-max-duration disable duration ceiling
1111
* --max-turns <n> maximum agent turns
1212
* --no-max-turns disable turns ceiling
13+
* --max-concurrent-jobs <n> max concurrent running jobs
14+
* --max-concurrent-jobs-per-workflow <n> max concurrent running jobs for this workflow
1315
* --hitl <strategy> HITL strategy: elicit | auto | fail (default: elicit)
1416
* --name <name> MCP server name (default: workflow name)
1517
* --log-file <path> write stderr log to a file
@@ -24,7 +26,11 @@
2426
import fs from "node:fs";
2527
import path from "node:path";
2628
import type { WorkflowDef } from "@ageflow/core";
27-
import type { CliCeilings, HitlStrategy } from "@ageflow/mcp-server";
29+
import type {
30+
CliCeilings,
31+
ConcurrencyConfig,
32+
HitlStrategy,
33+
} from "@ageflow/mcp-server";
2834
import {
2935
createHttpTransport,
3036
createSingleWorkflowServer,
@@ -39,6 +45,8 @@ export interface McpServeArgs {
3945
readonly maxCostUsd?: number | null;
4046
readonly maxDurationSec?: number | null;
4147
readonly maxTurns?: number | null;
48+
readonly maxConcurrentJobs?: number;
49+
readonly maxConcurrentJobsPerWorkflow?: number;
4250
readonly hitlStrategy: HitlStrategy;
4351
readonly serverName?: string;
4452
readonly logFile?: string;
@@ -86,6 +94,8 @@ export function parseMcpServeArgs(argv: readonly string[]): McpServeArgs {
8694
let maxCostUsd: number | null | undefined = undefined;
8795
let maxDurationSec: number | null | undefined = undefined;
8896
let maxTurns: number | null | undefined = undefined;
97+
let maxConcurrentJobs: number | undefined = undefined;
98+
let maxConcurrentJobsPerWorkflow: number | undefined = undefined;
8999
let hitlStrategy: HitlStrategy = "elicit";
90100
let serverName: string | undefined = undefined;
91101
let logFile: string | undefined = undefined;
@@ -155,6 +165,38 @@ export function parseMcpServeArgs(argv: readonly string[]): McpServeArgs {
155165
maxTurns = null;
156166
break;
157167

168+
case "--max-concurrent-jobs": {
169+
const val = args[++i];
170+
if (val === undefined || val.startsWith("--")) {
171+
throw new Error("--max-concurrent-jobs requires a numeric argument");
172+
}
173+
const n = Number(val);
174+
if (!Number.isInteger(n) || n <= 0) {
175+
throw new Error(
176+
`--max-concurrent-jobs must be a positive integer, got: ${val}`,
177+
);
178+
}
179+
maxConcurrentJobs = n;
180+
break;
181+
}
182+
183+
case "--max-concurrent-jobs-per-workflow": {
184+
const val = args[++i];
185+
if (val === undefined || val.startsWith("--")) {
186+
throw new Error(
187+
"--max-concurrent-jobs-per-workflow requires a numeric argument",
188+
);
189+
}
190+
const n = Number(val);
191+
if (!Number.isInteger(n) || n <= 0) {
192+
throw new Error(
193+
`--max-concurrent-jobs-per-workflow must be a positive integer, got: ${val}`,
194+
);
195+
}
196+
maxConcurrentJobsPerWorkflow = n;
197+
break;
198+
}
199+
158200
case "--hitl": {
159201
const val = args[++i];
160202
if (val !== "elicit" && val !== "auto" && val !== "fail") {
@@ -273,9 +315,13 @@ export function parseMcpServeArgs(argv: readonly string[]): McpServeArgs {
273315
asyncMode !== true &&
274316
(jobTtlMs !== undefined ||
275317
jobCheckpointTtlMs !== undefined ||
276-
jobDb !== undefined)
318+
jobDb !== undefined ||
319+
maxConcurrentJobs !== undefined ||
320+
maxConcurrentJobsPerWorkflow !== undefined)
277321
) {
278-
throw new Error("--job-ttl / --checkpoint-ttl / --job-db requires --async");
322+
throw new Error(
323+
"--job-ttl / --checkpoint-ttl / --job-db / --max-concurrent-jobs / --max-concurrent-jobs-per-workflow requires --async",
324+
);
279325
}
280326

281327
if (httpPort !== undefined && httpMode !== true) {
@@ -311,6 +357,10 @@ export function parseMcpServeArgs(argv: readonly string[]): McpServeArgs {
311357
...(maxCostUsd !== undefined ? { maxCostUsd } : {}),
312358
...(maxDurationSec !== undefined ? { maxDurationSec } : {}),
313359
...(maxTurns !== undefined ? { maxTurns } : {}),
360+
...(maxConcurrentJobs !== undefined ? { maxConcurrentJobs } : {}),
361+
...(maxConcurrentJobsPerWorkflow !== undefined
362+
? { maxConcurrentJobsPerWorkflow }
363+
: {}),
314364
...(serverName !== undefined ? { serverName } : {}),
315365
...(logFile !== undefined ? { logFile } : {}),
316366
...(asyncMode !== undefined ? { async: asyncMode } : {}),
@@ -370,6 +420,21 @@ async function runMcpServe(rawArgv: string[]): Promise<void> {
370420
: {}),
371421
...(parsed.maxTurns !== undefined ? { maxTurns: parsed.maxTurns } : {}),
372422
};
423+
const concurrency: ConcurrencyConfig | undefined =
424+
parsed.maxConcurrentJobs !== undefined ||
425+
parsed.maxConcurrentJobsPerWorkflow !== undefined
426+
? {
427+
...(parsed.maxConcurrentJobs !== undefined
428+
? { maxConcurrentJobs: parsed.maxConcurrentJobs }
429+
: {}),
430+
...(parsed.maxConcurrentJobsPerWorkflow !== undefined
431+
? {
432+
maxConcurrentJobsPerWorkflow:
433+
parsed.maxConcurrentJobsPerWorkflow,
434+
}
435+
: {}),
436+
}
437+
: undefined;
373438

374439
// Determine server name
375440
const serverName = parsed.serverName ?? workflow.name;
@@ -390,6 +455,7 @@ async function runMcpServe(rawArgv: string[]): Promise<void> {
390455
workflow,
391456
cliCeilings,
392457
hitlStrategy: parsed.hitlStrategy,
458+
...(concurrency !== undefined ? { concurrency } : {}),
393459
stderr,
394460
...(parsed.async === true ? { async: true } : {}),
395461
...(parsed.jobTtlMs !== undefined ? { jobTtlMs: parsed.jobTtlMs } : {}),
@@ -474,6 +540,8 @@ export function registerMcpCommand(program: Command): void {
474540
" --no-max-duration disable duration ceiling\n" +
475541
" --max-turns <n> max agent turns\n" +
476542
" --no-max-turns disable turns ceiling\n" +
543+
" --max-concurrent-jobs <n> max concurrent running jobs\n" +
544+
" --max-concurrent-jobs-per-workflow <n> max concurrent running jobs for this workflow\n" +
477545
" --hitl <strategy> elicit | auto | fail (default: elicit)\n" +
478546
" --name <name> MCP server name\n" +
479547
" --log-file <path> log stderr to file\n" +

packages/mcp-server/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@ageflow/mcp-server",
3-
"version": "0.7.0",
3+
"version": "0.8.0",
44
"description": "Expose ageflow workflows as MCP tools (stdio transport, progress streaming, HITL via elicitation).",
55
"homepage": "https://github.com/Neftedollar/ageflow/tree/master/packages/mcp-server",
66
"type": "module",

packages/mcp-server/src/__tests__/errors.test.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@ describe("async-mode error mapping (#18)", () => {
6565
expect(ErrorCode.JOB_CANCELLED).toBe("JOB_CANCELLED");
6666
expect(ErrorCode.INVALID_RUN_STATE).toBe("INVALID_RUN_STATE");
6767
expect(ErrorCode.ASYNC_MODE_DISABLED).toBe("ASYNC_MODE_DISABLED");
68+
expect(ErrorCode.CONCURRENCY_LIMIT_EXCEEDED).toBe(
69+
"CONCURRENCY_LIMIT_EXCEEDED",
70+
);
6871
});
6972

7073
it("maps RunNotFoundError → JOB_NOT_FOUND", () => {

packages/mcp-server/src/__tests__/integration/async-mode.test.ts

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,48 @@ describe("async mode: inflight lock (#18)", () => {
259259
release();
260260
h.dispose?.();
261261
});
262+
263+
it("applies maxConcurrentJobsPerWorkflow in single-workflow mode", async () => {
264+
const h = createMcpServer({
265+
workflow,
266+
cliCeilings: {},
267+
hitlStrategy: "auto",
268+
async: true,
269+
concurrency: {
270+
maxConcurrentJobs: 2,
271+
maxConcurrentJobsPerWorkflow: 1,
272+
},
273+
});
274+
275+
let release!: () => void;
276+
h._testRunExecutor = () =>
277+
new Promise((res) => {
278+
release = () => res({ a: "ok" });
279+
});
280+
281+
const first = h.callTool("start_ask", { q: "1" });
282+
for (let i = 0; i < 50 && typeof release !== "function"; i += 1) {
283+
await new Promise<void>((resolve) => setTimeout(resolve, 0));
284+
}
285+
expect(typeof release).toBe("function");
286+
287+
const second = await h.callTool("start_ask", { q: "2" });
288+
expect(second.isError).toBe(true);
289+
if (second.isError) {
290+
expect(second.structuredContent.errorCode).toBe("BUSY");
291+
expect(second.structuredContent.context).toMatchObject({
292+
scope: "workflow",
293+
kind: "start",
294+
workflowName: "ask",
295+
limit: 1,
296+
active: 1,
297+
});
298+
}
299+
300+
release();
301+
await first;
302+
h.dispose?.();
303+
});
262304
});
263305

264306
describe("async mode: cancel_workflow (#18)", () => {

0 commit comments

Comments
 (0)