Skip to content

Commit aefc53f

Browse files
committed
Make executor mcp attach to daemon
1 parent 49e1675 commit aefc53f

5 files changed

Lines changed: 478 additions & 34 deletions

File tree

apps/cli/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
"@executor-js/runtime-quickjs": "workspace:*",
3131
"@executor-js/sdk": "workspace:*",
3232
"@jitl/quickjs-wasmfile-release-sync": "catalog:",
33+
"@modelcontextprotocol/sdk": "^1.29.0",
3334
"@sentry/bun": "^10.57.0",
3435
"effect": "catalog:",
3536
"quickjs-emscripten": "catalog:"

apps/cli/src/main.ts

Lines changed: 160 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ import type { PlatformError } from "effect/PlatformError";
6464
import * as Effect from "effect/Effect";
6565
import * as Option from "effect/Option";
6666
import * as Cause from "effect/Cause";
67+
import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js";
68+
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
69+
import type { JSONRPCMessage } from "@modelcontextprotocol/sdk/types.js";
6770

6871
import { ExecutorApi } from "@executor-js/api";
6972
import {
@@ -1122,9 +1125,123 @@ const withStdoutReroutedToStderr = async <A>(body: () => Promise<A>): Promise<A>
11221125
}
11231126
};
11241127

1128+
const mcpUrlForActiveLocalServer = (
1129+
connection: ExecutorServerConnection,
1130+
elicitationMode: "browser" | "model",
1131+
): URL => {
1132+
const url = new URL("/mcp", connection.origin);
1133+
if (elicitationMode === "browser") {
1134+
url.searchParams.set("elicitation_mode", "browser");
1135+
}
1136+
return url;
1137+
};
1138+
1139+
const closeMcpBridgeTransport = async (close: () => Promise<void>): Promise<void> => {
1140+
await close();
1141+
};
1142+
1143+
const isAbortDuringMcpBridgeClose = (error: Error): boolean =>
1144+
error.name === "AbortError" || error.message.toLowerCase().includes("aborted");
1145+
1146+
const runMcpHttpBridge = async (input: {
1147+
readonly manifest: ExecutorLocalServerManifest;
1148+
readonly elicitationMode: "browser" | "model";
1149+
}): Promise<void> => {
1150+
const stdio = new StdioServerTransport();
1151+
const authorization = getExecutorServerAuthorizationHeader(input.manifest.connection);
1152+
const http = new StreamableHTTPClientTransport(
1153+
mcpUrlForActiveLocalServer(input.manifest.connection, input.elicitationMode),
1154+
authorization ? { requestInit: { headers: { Authorization: authorization } } } : undefined,
1155+
);
1156+
1157+
let finished = false;
1158+
let closing = false;
1159+
let closePromise: Promise<void> | null = null;
1160+
let resolveExit: () => void = () => {};
1161+
1162+
const waitForExit = new Promise<void>((resolve) => {
1163+
resolveExit = resolve;
1164+
});
1165+
1166+
const finish = () => {
1167+
if (finished) return;
1168+
finished = true;
1169+
process.off("SIGINT", shutdown);
1170+
process.off("SIGTERM", shutdown);
1171+
process.stdin.off("end", shutdown);
1172+
process.stdin.off("close", shutdown);
1173+
resolveExit();
1174+
};
1175+
1176+
const closeBoth = (): Promise<void> => {
1177+
if (!closePromise) {
1178+
closing = true;
1179+
closePromise = Promise.resolve()
1180+
.then(() =>
1181+
Promise.allSettled([
1182+
closeMcpBridgeTransport(() => stdio.close()),
1183+
closeMcpBridgeTransport(() => http.close()),
1184+
]),
1185+
)
1186+
.then(() => undefined);
1187+
}
1188+
return closePromise;
1189+
};
1190+
1191+
function shutdown() {
1192+
finish();
1193+
void closeBoth();
1194+
}
1195+
1196+
const reportError = (context: string, cause: unknown) => {
1197+
const error = toError(cause);
1198+
if (closing && isAbortDuringMcpBridgeClose(error)) return;
1199+
console.error(`Executor MCP bridge ${context}: ${error.message}`);
1200+
};
1201+
1202+
const forwardMessage =
1203+
(send: (message: JSONRPCMessage) => Promise<void>, context: string) =>
1204+
(message: JSONRPCMessage) => {
1205+
void send(message).then(undefined, (cause: unknown) => {
1206+
reportError(context, cause);
1207+
shutdown();
1208+
});
1209+
};
1210+
1211+
process.once("SIGINT", shutdown);
1212+
process.once("SIGTERM", shutdown);
1213+
process.stdin.once("end", shutdown);
1214+
process.stdin.once("close", shutdown);
1215+
1216+
stdio.onclose = shutdown;
1217+
http.onclose = shutdown;
1218+
stdio.onerror = (error) => reportError("stdio transport error", error);
1219+
http.onerror = (error) => reportError("daemon transport error", error);
1220+
stdio.onmessage = forwardMessage((message) => http.send(message), "failed to send to daemon");
1221+
http.onmessage = forwardMessage((message) => stdio.send(message), "failed to send to stdio");
1222+
1223+
try {
1224+
await http.start();
1225+
await stdio.start();
1226+
await waitForExit;
1227+
} finally {
1228+
finish();
1229+
await closeBoth();
1230+
}
1231+
};
1232+
11251233
const runStdioMcpSession = (input: { readonly elicitationMode: "browser" | "model" }) =>
11261234
Effect.gen(function* () {
1235+
const active = yield* readActiveLocalServerManifest();
1236+
if (active) {
1237+
yield* Effect.promise(() =>
1238+
runMcpHttpBridge({ manifest: active, elicitationMode: input.elicitationMode }),
1239+
);
1240+
return;
1241+
}
1242+
11271243
const startupLock = yield* acquireLocalServerStartLock();
1244+
let activeAfterLock: ExecutorLocalServerManifest | null = null;
11281245
let web: Awaited<
11291246
ReturnType<
11301247
typeof withStdoutReroutedToStderr<{
@@ -1137,44 +1254,53 @@ const runStdioMcpSession = (input: { readonly elicitationMode: "browser" | "mode
11371254
> | null = null;
11381255

11391256
try {
1140-
yield* assertNoOtherActiveLocalServer();
1141-
web = yield* Effect.promise(() =>
1142-
withStdoutReroutedToStderr(async () => {
1143-
const host = "127.0.0.1";
1144-
const port = await Effect.runPromise(
1145-
chooseDaemonPort({ preferredPort: DEFAULT_PORT, hostname: host }),
1146-
);
1147-
const baseUrl = `http://localhost:${port}`;
1148-
const restoreWebBaseUrl = installDefaultExecutorWebBaseUrl(baseUrl);
1149-
1150-
try {
1151-
const executor = await getExecutor();
1152-
const server = await startServer({
1153-
port,
1154-
hostname: host,
1155-
embeddedWebUI,
1156-
});
1157-
const serverBaseUrl = `http://localhost:${server.port}`;
1158-
return { executor, server, baseUrl: serverBaseUrl, restoreWebBaseUrl };
1159-
} catch (cause) {
1160-
restoreWebBaseUrl();
1161-
throw cause;
1162-
}
1163-
}),
1164-
);
1165-
yield* publishLocalServerManifest({
1166-
kind: "foreground",
1167-
connection: normalizeExecutorServerConnection({
1168-
kind: "http",
1169-
origin: web.baseUrl,
1170-
displayName: "CLI MCP",
1171-
auth: { kind: "bearer", token: web.server.authToken },
1172-
}),
1173-
});
1257+
activeAfterLock = yield* readActiveLocalServerManifest();
1258+
if (!activeAfterLock) {
1259+
web = yield* Effect.promise(() =>
1260+
withStdoutReroutedToStderr(async () => {
1261+
const host = "127.0.0.1";
1262+
const port = await Effect.runPromise(
1263+
chooseDaemonPort({ preferredPort: DEFAULT_PORT, hostname: host }),
1264+
);
1265+
const baseUrl = `http://localhost:${port}`;
1266+
const restoreWebBaseUrl = installDefaultExecutorWebBaseUrl(baseUrl);
1267+
1268+
try {
1269+
const executor = await getExecutor();
1270+
const server = await startServer({
1271+
port,
1272+
hostname: host,
1273+
embeddedWebUI,
1274+
});
1275+
const serverBaseUrl = `http://localhost:${server.port}`;
1276+
return { executor, server, baseUrl: serverBaseUrl, restoreWebBaseUrl };
1277+
} catch (cause) {
1278+
restoreWebBaseUrl();
1279+
throw cause;
1280+
}
1281+
}),
1282+
);
1283+
yield* publishLocalServerManifest({
1284+
kind: "foreground",
1285+
connection: normalizeExecutorServerConnection({
1286+
kind: "http",
1287+
origin: web.baseUrl,
1288+
displayName: "CLI MCP",
1289+
auth: { kind: "bearer", token: web.server.authToken },
1290+
}),
1291+
});
1292+
}
11741293
} finally {
11751294
yield* releaseLocalServerStartLock(startupLock).pipe(Effect.ignore);
11761295
}
11771296

1297+
if (activeAfterLock) {
1298+
yield* Effect.promise(() =>
1299+
runMcpHttpBridge({ manifest: activeAfterLock, elicitationMode: input.elicitationMode }),
1300+
);
1301+
return;
1302+
}
1303+
11781304
if (!web) return yield* Effect.fail(new Error("Failed to start local Executor MCP server."));
11791305

11801306
try {

bun.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)