Skip to content

Commit db1ee2c

Browse files
committed
Route MCP transports through HttpClient layer
1 parent f5906e3 commit db1ee2c

6 files changed

Lines changed: 148 additions & 6 deletions

File tree

packages/core/sdk/src/executor.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1920,6 +1920,7 @@ export const createExecutor = <const TPlugins extends readonly AnyPlugin[] = rea
19201920
.resolveTools({
19211921
integration: rowToIntegration(integrationRow),
19221922
config: decodeJsonColumn(integrationRow.config),
1923+
httpClientLayer: runtime.ctx.httpClientLayer,
19231924
connection: ref,
19241925
template: existingRow ? AuthTemplateSlug.make(existingRow.template) : null,
19251926
storage: runtime.storage,

packages/core/sdk/src/plugin.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,7 @@ export interface ResolveToolsInput<TStore = unknown> {
211211
* facades (e.g. a content-addressed spec blob) instead of inlining them
212212
* in `config`. */
213213
readonly storage: TStore;
214+
readonly httpClientLayer: Layer.Layer<HttpClient.HttpClient>;
214215
/** The connection whose tools are being resolved. */
215216
readonly connection: ConnectionRef;
216217
/** Which of the integration's declared auth methods the connection binds

packages/plugins/mcp/src/sdk/connection.ts

Lines changed: 101 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@ import type { OAuthClientProvider } from "@modelcontextprotocol/sdk/client/auth.
22
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
33
import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js";
44
import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js";
5+
import type { FetchLike } from "@modelcontextprotocol/sdk/shared/transport.js";
56
import { CfWorkerJsonSchemaValidator } from "@modelcontextprotocol/sdk/validation/cfworker";
6-
import { Effect, Predicate } from "effect";
7+
import { Effect, Layer, Predicate, Stream } from "effect";
8+
import { HttpClient, HttpClientRequest } from "effect/unstable/http";
79

810
// NOTE: `StdioClientTransport` is NOT imported eagerly. The upstream module
911
// (`@modelcontextprotocol/sdk/client/stdio.js`) touches `node:child_process`
@@ -42,6 +44,7 @@ export type RemoteConnectorInput = Omit<
4244
readonly headers?: Record<string, string>;
4345
readonly queryParams?: Record<string, string>;
4446
readonly authProvider?: OAuthClientProvider;
47+
readonly httpClientLayer?: Layer.Layer<HttpClient.HttpClient>;
4548
};
4649

4750
export type StdioConnectorInput = McpStdioIntegrationConfig;
@@ -60,6 +63,100 @@ const buildEndpointUrl = (endpoint: string, queryParams: Record<string, string>)
6063
return url;
6164
};
6265

66+
type HttpMethod = Parameters<typeof HttpClientRequest.make>[0];
67+
const HTTP_METHODS = new Set<HttpMethod>([
68+
"DELETE",
69+
"GET",
70+
"HEAD",
71+
"OPTIONS",
72+
"PATCH",
73+
"POST",
74+
"PUT",
75+
]);
76+
77+
const httpMethodFrom = (method: string | undefined): HttpMethod => {
78+
const normalized = (method ?? "GET").toUpperCase() as HttpMethod;
79+
return HTTP_METHODS.has(normalized) ? normalized : "POST";
80+
};
81+
82+
const headersFrom = (headers: HeadersInit | undefined): Headers =>
83+
headers ? new Headers(headers) : new Headers();
84+
85+
const recordFromHeaders = (headers: Headers): Record<string, string> =>
86+
Object.fromEntries(headers.entries());
87+
88+
const applyBody = async (
89+
request: HttpClientRequest.HttpClientRequest,
90+
headers: Headers,
91+
body: BodyInit | null | undefined,
92+
): Promise<HttpClientRequest.HttpClientRequest> => {
93+
if (body == null) return request;
94+
const contentType = headers.get("content-type") ?? undefined;
95+
if (typeof body === "string") return HttpClientRequest.bodyText(request, body, contentType);
96+
if (body instanceof URLSearchParams) {
97+
return HttpClientRequest.bodyText(
98+
request,
99+
body.toString(),
100+
contentType ?? "application/x-www-form-urlencoded;charset=UTF-8",
101+
);
102+
}
103+
if (body instanceof Uint8Array)
104+
return HttpClientRequest.bodyUint8Array(request, body, contentType);
105+
if (body instanceof ArrayBuffer) {
106+
return HttpClientRequest.bodyUint8Array(request, new Uint8Array(body), contentType);
107+
}
108+
const bytes = new Uint8Array(await new Response(body).arrayBuffer());
109+
return HttpClientRequest.bodyUint8Array(request, bytes, contentType);
110+
};
111+
112+
const abortError = (signal: AbortSignal): unknown => {
113+
if (signal.reason !== undefined) return signal.reason;
114+
// oxlint-disable-next-line executor/no-error-constructor -- boundary: Fetch-compatible adapter must reject with an AbortError-shaped value
115+
const error = new Error("The operation was aborted");
116+
error.name = "AbortError";
117+
return error;
118+
};
119+
120+
const fetchFromHttpClientLayer = (
121+
httpClientLayer: Layer.Layer<HttpClient.HttpClient>,
122+
): FetchLike => {
123+
const execute: FetchLike = async (url, init) => {
124+
const headers = headersFrom(init?.headers);
125+
const requestWithoutBody = HttpClientRequest.make(httpMethodFrom(init?.method))(url, {
126+
headers: recordFromHeaders(headers),
127+
});
128+
const request = await applyBody(requestWithoutBody, headers, init?.body);
129+
const effect = Effect.gen(function* () {
130+
const client = yield* HttpClient.HttpClient;
131+
const response = yield* client.execute(request);
132+
const responseHeaders = new Headers();
133+
for (const [key, value] of Object.entries(response.headers)) {
134+
if (value !== undefined) responseHeaders.set(key, value);
135+
}
136+
const body =
137+
response.status === 204 || response.status === 205 || response.status === 304
138+
? null
139+
: Stream.toReadableStream(response.stream);
140+
return new Response(body, {
141+
status: response.status,
142+
headers: responseHeaders,
143+
});
144+
}).pipe(Effect.provide(httpClientLayer));
145+
const promise = Effect.runPromise(effect);
146+
if (!init?.signal) return promise;
147+
// oxlint-disable-next-line executor/no-promise-reject -- boundary: Fetch-compatible adapter mirrors abort rejection semantics
148+
if (init.signal.aborted) return Promise.reject(abortError(init.signal));
149+
const aborted = new Promise<never>((_, reject) => {
150+
// oxlint-disable-next-line executor/no-promise-reject -- boundary: Fetch-compatible adapter races the Effect request against AbortSignal
151+
init.signal?.addEventListener("abort", () => reject(abortError(init.signal!)), {
152+
once: true,
153+
});
154+
});
155+
return Promise.race([promise, aborted]);
156+
};
157+
return execute;
158+
};
159+
63160
// Use the cfworker JSON Schema validator instead of the SDK's default
64161
// (Ajv). Ajv compiles schemas via `new Function(...)`, which throws
65162
// `Code generation from strings disallowed for this context` when the
@@ -157,6 +254,7 @@ export const createMcpConnector = (input: ConnectorInput): McpConnector => {
157254
const headers = input.headers ?? {};
158255
const remoteTransport = input.remoteTransport ?? "auto";
159256
const requestInit = Object.keys(headers).length > 0 ? { headers } : undefined;
257+
const fetch = input.httpClientLayer ? fetchFromHttpClientLayer(input.httpClientLayer) : undefined;
160258

161259
const endpoint = buildEndpointUrl(input.endpoint, input.queryParams ?? {});
162260

@@ -166,6 +264,7 @@ export const createMcpConnector = (input: ConnectorInput): McpConnector => {
166264
new StreamableHTTPClientTransport(endpoint, {
167265
requestInit,
168266
authProvider: input.authProvider,
267+
fetch,
169268
}),
170269
});
171270

@@ -175,6 +274,7 @@ export const createMcpConnector = (input: ConnectorInput): McpConnector => {
175274
new SSEClientTransport(endpoint, {
176275
requestInit,
177276
authProvider: input.authProvider,
277+
fetch,
178278
}),
179279
});
180280

packages/plugins/mcp/src/sdk/plugin.test.ts

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
import { describe, expect, it } from "@effect/vitest";
2-
import { Effect, Option, Predicate, Schema } from "effect";
3-
import { HttpServerResponse } from "effect/unstable/http";
2+
import { Effect, Layer, Option, Predicate, Schema } from "effect";
3+
import {
4+
HttpClient,
5+
HttpClientRequest,
6+
HttpClientResponse,
7+
HttpServerResponse,
8+
} from "effect/unstable/http";
49

510
import {
611
AuthTemplateSlug,
@@ -17,6 +22,7 @@ import {
1722
serveTestHttpApp,
1823
} from "@executor-js/sdk/testing";
1924

25+
import { createMcpConnector } from "./connection";
2026
import { mcpPlugin, userFacingProbeMessage } from "./plugin";
2127
import { McpInvocationError } from "./errors";
2228
import { extractManifestFromListToolsResult, deriveMcpNamespace, joinToolPath } from "./manifest";
@@ -299,6 +305,30 @@ describe("mcpPlugin", () => {
299305
}),
300306
);
301307

308+
it.effect("routes remote connector traffic through the provided HttpClient layer", () =>
309+
Effect.gen(function* () {
310+
const seen: string[] = [];
311+
const httpClientLayer = Layer.succeed(HttpClient.HttpClient)(
312+
HttpClient.make((request: HttpClientRequest.HttpClientRequest) => {
313+
seen.push(request.url);
314+
return Effect.succeed(
315+
HttpClientResponse.fromWeb(request, new Response("blocked", { status: 403 })),
316+
);
317+
}),
318+
);
319+
320+
const error = yield* createMcpConnector({
321+
transport: "remote",
322+
endpoint: "https://internal.example/mcp",
323+
remoteTransport: "streamable-http",
324+
httpClientLayer,
325+
}).pipe(Effect.flip);
326+
327+
expect(Predicate.isTagged(error, "McpConnectionError")).toBe(true);
328+
expect(seen).toEqual(["https://internal.example/mcp"]);
329+
}),
330+
);
331+
302332
it.effect("integration catalog has no configured MCP integrations initially", () =>
303333
Effect.gen(function* () {
304334
const executor = yield* createExecutor(makeTestConfig({ plugins: [mcpPlugin()] as const }));

packages/plugins/mcp/src/sdk/plugin.ts

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,7 @@ const buildConnectorInput = (
469469
values: Record<string, string | null>,
470470
templateSlug: string | null,
471471
allowStdio: boolean,
472+
httpClientLayer?: Layer.Layer<HttpClient.HttpClient>,
472473
): Effect.Effect<ConnectorInput, McpConnectionError> => {
473474
if (config.transport === "stdio") {
474475
if (!allowStdio) {
@@ -512,6 +513,7 @@ const buildConnectorInput = (
512513
queryParams: Object.keys(queryParams).length > 0 ? queryParams : undefined,
513514
headers: Object.keys(headers).length > 0 ? headers : undefined,
514515
authProvider,
516+
httpClientLayer,
515517
});
516518
};
517519

@@ -636,6 +638,7 @@ export const mcpPlugin = definePlugin((options?: McpPluginOptions) => {
636638
endpoint: trimmed,
637639
headers: probeHeaders,
638640
queryParams: probeQueryParams,
641+
httpClientLayer,
639642
});
640643

641644
const result = yield* discoverTools(connector).pipe(
@@ -929,7 +932,7 @@ export const mcpPlugin = definePlugin((options?: McpPluginOptions) => {
929932
// Discovery failures (auth not ready, server down) yield an empty tool set
930933
// rather than failing — the connection still lands and can be refreshed.
931934
// -----------------------------------------------------------------------
932-
resolveTools: ({ config, connection, template, getValues }) =>
935+
resolveTools: ({ config, connection, template, getValues, httpClientLayer }) =>
933936
Effect.gen(function* () {
934937
const parsed = parseMcpIntegrationConfig(config);
935938
if (!parsed) return { tools: [] as readonly ToolDef[] };
@@ -945,6 +948,7 @@ export const mcpPlugin = definePlugin((options?: McpPluginOptions) => {
945948
values,
946949
template === null ? null : String(template),
947950
allowStdio,
951+
httpClientLayer,
948952
).pipe(
949953
Effect.map((ci) => createMcpConnector(ci)),
950954
Effect.result,
@@ -968,7 +972,7 @@ export const mcpPlugin = definePlugin((options?: McpPluginOptions) => {
968972
}),
969973
) as Effect.Effect<{ readonly tools: readonly ToolDef[] }, StorageFailure>,
970974

971-
invokeTool: ({ toolRow, credential, args, elicit }) =>
975+
invokeTool: ({ ctx, toolRow, credential, args, elicit }) =>
972976
Effect.gen(function* () {
973977
const parsed = parseMcpIntegrationConfig(credential.config);
974978
if (!parsed) {
@@ -1013,6 +1017,7 @@ export const mcpPlugin = definePlugin((options?: McpPluginOptions) => {
10131017
credential.values,
10141018
String(credential.template),
10151019
allowStdio,
1020+
options?.httpClientLayer ?? ctx.httpClientLayer,
10161021
).pipe(Effect.map((ci) => createMcpConnector(ci)));
10171022

10181023
const raw = yield* invokeMcpTool({
@@ -1087,7 +1092,11 @@ export const mcpPlugin = definePlugin((options?: McpPluginOptions) => {
10871092
const name = parsed.value.hostname || "mcp";
10881093
const slug = deriveMcpNamespace({ endpoint: trimmed });
10891094

1090-
const connector = createMcpConnector({ transport: "remote", endpoint: trimmed });
1095+
const connector = createMcpConnector({
1096+
transport: "remote",
1097+
endpoint: trimmed,
1098+
httpClientLayer,
1099+
});
10911100

10921101
const connected = yield* discoverTools(connector).pipe(
10931102
Effect.map(() => true),

packages/plugins/openapi/src/sdk/spec-blob.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ describe("OpenAPI plugin — spec blob storage", () => {
161161
},
162162
template: null,
163163
storage,
164+
httpClientLayer: FetchHttpClient.layer,
164165
getValue: () => Effect.succeed(null),
165166
getValues: () => Effect.succeed({}),
166167
});

0 commit comments

Comments
 (0)