@@ -2,8 +2,10 @@ import type { OAuthClientProvider } from "@modelcontextprotocol/sdk/client/auth.
22import { Client } from "@modelcontextprotocol/sdk/client/index.js" ;
33import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js" ;
44import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js" ;
5+ import type { FetchLike } from "@modelcontextprotocol/sdk/shared/transport.js" ;
56import { CfWorkerJsonSchemaValidator } from "@modelcontextprotocol/sdk/validation/cfworker" ;
6- import { Effect , Predicate } from "effect" ;
7+ import { Effect , Layer , Predicate , Stream } from "effect" ;
8+ import { HttpClient , HttpClientRequest } from "effect/unstable/http" ;
79
810// NOTE: `StdioClientTransport` is NOT imported eagerly. The upstream module
911// (`@modelcontextprotocol/sdk/client/stdio.js`) touches `node:child_process`
@@ -42,6 +44,7 @@ export type RemoteConnectorInput = Omit<
4244 readonly headers ?: Record < string , string > ;
4345 readonly queryParams ?: Record < string , string > ;
4446 readonly authProvider ?: OAuthClientProvider ;
47+ readonly httpClientLayer ?: Layer . Layer < HttpClient . HttpClient > ;
4548} ;
4649
4750export type StdioConnectorInput = McpStdioIntegrationConfig ;
@@ -60,6 +63,100 @@ const buildEndpointUrl = (endpoint: string, queryParams: Record<string, string>)
6063 return url ;
6164} ;
6265
66+ type HttpMethod = Parameters < typeof HttpClientRequest . make > [ 0 ] ;
67+ const HTTP_METHODS = new Set < HttpMethod > ( [
68+ "DELETE" ,
69+ "GET" ,
70+ "HEAD" ,
71+ "OPTIONS" ,
72+ "PATCH" ,
73+ "POST" ,
74+ "PUT" ,
75+ ] ) ;
76+
77+ const httpMethodFrom = ( method : string | undefined ) : HttpMethod => {
78+ const normalized = ( method ?? "GET" ) . toUpperCase ( ) as HttpMethod ;
79+ return HTTP_METHODS . has ( normalized ) ? normalized : "POST" ;
80+ } ;
81+
82+ const headersFrom = ( headers : HeadersInit | undefined ) : Headers =>
83+ headers ? new Headers ( headers ) : new Headers ( ) ;
84+
85+ const recordFromHeaders = ( headers : Headers ) : Record < string , string > =>
86+ Object . fromEntries ( headers . entries ( ) ) ;
87+
88+ const applyBody = async (
89+ request : HttpClientRequest . HttpClientRequest ,
90+ headers : Headers ,
91+ body : BodyInit | null | undefined ,
92+ ) : Promise < HttpClientRequest . HttpClientRequest > => {
93+ if ( body == null ) return request ;
94+ const contentType = headers . get ( "content-type" ) ?? undefined ;
95+ if ( typeof body === "string" ) return HttpClientRequest . bodyText ( request , body , contentType ) ;
96+ if ( body instanceof URLSearchParams ) {
97+ return HttpClientRequest . bodyText (
98+ request ,
99+ body . toString ( ) ,
100+ contentType ?? "application/x-www-form-urlencoded;charset=UTF-8" ,
101+ ) ;
102+ }
103+ if ( body instanceof Uint8Array )
104+ return HttpClientRequest . bodyUint8Array ( request , body , contentType ) ;
105+ if ( body instanceof ArrayBuffer ) {
106+ return HttpClientRequest . bodyUint8Array ( request , new Uint8Array ( body ) , contentType ) ;
107+ }
108+ const bytes = new Uint8Array ( await new Response ( body ) . arrayBuffer ( ) ) ;
109+ return HttpClientRequest . bodyUint8Array ( request , bytes , contentType ) ;
110+ } ;
111+
112+ const abortError = ( signal : AbortSignal ) : unknown => {
113+ if ( signal . reason !== undefined ) return signal . reason ;
114+ // oxlint-disable-next-line executor/no-error-constructor -- boundary: Fetch-compatible adapter must reject with an AbortError-shaped value
115+ const error = new Error ( "The operation was aborted" ) ;
116+ error . name = "AbortError" ;
117+ return error ;
118+ } ;
119+
120+ const fetchFromHttpClientLayer = (
121+ httpClientLayer : Layer . Layer < HttpClient . HttpClient > ,
122+ ) : FetchLike => {
123+ const execute : FetchLike = async ( url , init ) => {
124+ const headers = headersFrom ( init ?. headers ) ;
125+ const requestWithoutBody = HttpClientRequest . make ( httpMethodFrom ( init ?. method ) ) ( url , {
126+ headers : recordFromHeaders ( headers ) ,
127+ } ) ;
128+ const request = await applyBody ( requestWithoutBody , headers , init ?. body ) ;
129+ const effect = Effect . gen ( function * ( ) {
130+ const client = yield * HttpClient . HttpClient ;
131+ const response = yield * client . execute ( request ) ;
132+ const responseHeaders = new Headers ( ) ;
133+ for ( const [ key , value ] of Object . entries ( response . headers ) ) {
134+ if ( value !== undefined ) responseHeaders . set ( key , value ) ;
135+ }
136+ const body =
137+ response . status === 204 || response . status === 205 || response . status === 304
138+ ? null
139+ : Stream . toReadableStream ( response . stream ) ;
140+ return new Response ( body , {
141+ status : response . status ,
142+ headers : responseHeaders ,
143+ } ) ;
144+ } ) . pipe ( Effect . provide ( httpClientLayer ) ) ;
145+ const promise = Effect . runPromise ( effect ) ;
146+ if ( ! init ?. signal ) return promise ;
147+ // oxlint-disable-next-line executor/no-promise-reject -- boundary: Fetch-compatible adapter mirrors abort rejection semantics
148+ if ( init . signal . aborted ) return Promise . reject ( abortError ( init . signal ) ) ;
149+ const aborted = new Promise < never > ( ( _ , reject ) => {
150+ // oxlint-disable-next-line executor/no-promise-reject -- boundary: Fetch-compatible adapter races the Effect request against AbortSignal
151+ init . signal ?. addEventListener ( "abort" , ( ) => reject ( abortError ( init . signal ! ) ) , {
152+ once : true ,
153+ } ) ;
154+ } ) ;
155+ return Promise . race ( [ promise , aborted ] ) ;
156+ } ;
157+ return execute ;
158+ } ;
159+
63160// Use the cfworker JSON Schema validator instead of the SDK's default
64161// (Ajv). Ajv compiles schemas via `new Function(...)`, which throws
65162// `Code generation from strings disallowed for this context` when the
@@ -157,6 +254,7 @@ export const createMcpConnector = (input: ConnectorInput): McpConnector => {
157254 const headers = input . headers ?? { } ;
158255 const remoteTransport = input . remoteTransport ?? "auto" ;
159256 const requestInit = Object . keys ( headers ) . length > 0 ? { headers } : undefined ;
257+ const fetch = input . httpClientLayer ? fetchFromHttpClientLayer ( input . httpClientLayer ) : undefined ;
160258
161259 const endpoint = buildEndpointUrl ( input . endpoint , input . queryParams ?? { } ) ;
162260
@@ -166,6 +264,7 @@ export const createMcpConnector = (input: ConnectorInput): McpConnector => {
166264 new StreamableHTTPClientTransport ( endpoint , {
167265 requestInit,
168266 authProvider : input . authProvider ,
267+ fetch,
169268 } ) ,
170269 } ) ;
171270
@@ -175,6 +274,7 @@ export const createMcpConnector = (input: ConnectorInput): McpConnector => {
175274 new SSEClientTransport ( endpoint , {
176275 requestInit,
177276 authProvider : input . authProvider ,
277+ fetch,
178278 } ) ,
179279 } ) ;
180280
0 commit comments