Skip to content

Commit 6dfcc47

Browse files
feat(server): BackchannelCompat; Server.buildContext outbound via ctx.mcpReq.send
BackchannelCompat: opt-in 2025-11 server-to-client backchannel for handleHttp. A handler's elicitInput/requestSampling writes onto the open POST's SSE stream and the client's POSTed-back response resolves the await. Server.buildContext factors elicitInput/requestSampling into _createMessageVia/_elicitInputVia so instance methods and ctx.mcpReq share the same validation and capability check (capability check reads instance _clientCapabilities; per-request flow in F1).
1 parent f34a8ff commit 6dfcc47

8 files changed

Lines changed: 339 additions & 32 deletions

File tree

packages/server/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ export { Server } from './server/server.js';
3131
// StdioServerTransport is exported from the './stdio' subpath — server stdio has only type-level Node
3232
// imports (erased at compile time), but matching the client's `./stdio` subpath gives consumers a
3333
// consistent shape across packages.
34+
export { BackchannelCompat } from './server/backchannelCompat.js';
3435
export type { Dispatchable, HandleHttpOptions, HandleHttpRequestExtra } from './server/handleHttp.js';
3536
export { handleHttp } from './server/handleHttp.js';
3637
export type { SessionCompatOptions, SessionValidation } from './server/sessionCompat.js';
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
import type {
2+
JSONRPCErrorResponse,
3+
JSONRPCMessage,
4+
JSONRPCRequest,
5+
JSONRPCResultResponse,
6+
Request,
7+
RequestOptions,
8+
Result
9+
} from '@modelcontextprotocol/core';
10+
import { DEFAULT_REQUEST_TIMEOUT_MSEC, isJSONRPCErrorResponse, ProtocolError, SdkError, SdkErrorCode } from '@modelcontextprotocol/core';
11+
12+
/**
13+
* Isolated 2025-11 server-to-client request backchannel for `handleHttp`.
14+
*
15+
* The 2025-11 protocol allows a server to send `elicitation/create` and
16+
* `sampling/createMessage` requests to the client mid-tool-call by writing them as
17+
* SSE events on the open POST response stream and waiting for the client to POST
18+
* the response back. This class owns the per-session `{requestId -> resolver}`
19+
* map that correlation requires.
20+
*
21+
* It exists so this stateful behaviour is in one removable file once MRTR
22+
* (SEP-2322) is the protocol floor and `env.send` becomes a hard error in
23+
* stateless paths.
24+
*/
25+
export class BackchannelCompat {
26+
private _pending = new Map<string, Map<number, { resolve: (r: Result) => void; reject: (e: Error) => void }>>();
27+
private _nextId = 0;
28+
29+
/**
30+
* Returns an `env.send` implementation bound to the given session and POST-stream writer.
31+
* The returned function writes the outbound JSON-RPC request to `writeSSE` and resolves when
32+
* {@linkcode handleResponse} is called for the same id on the same session.
33+
*
34+
* `writeSSE` returns `false` when the underlying stream is closed; the returned promise then
35+
* rejects immediately with `SendFailed` instead of waiting for the timeout.
36+
*
37+
* Backchannel writes are not persisted to the {@linkcode EventStore}, so a client that
38+
* disconnects mid-elicitation and resumes via `Last-Event-ID` will not see the outbound
39+
* request again; the awaiting handler will time out. This is a known limitation of the
40+
* legacy backchannel path; SEP-2322 (`ContinuationCompat`) is the resumable alternative.
41+
*/
42+
makeEnvSend(sessionId: string, writeSSE: (msg: JSONRPCMessage) => boolean): (req: Request, opts?: RequestOptions) => Promise<Result> {
43+
return (req: Request, opts?: RequestOptions): Promise<Result> => {
44+
return new Promise<Result>((resolve, reject) => {
45+
if (opts?.signal?.aborted) {
46+
reject(opts.signal.reason instanceof Error ? opts.signal.reason : new Error(String(opts.signal.reason)));
47+
return;
48+
}
49+
50+
const id = this._nextId++;
51+
const sessionMap = this._pending.get(sessionId) ?? new Map();
52+
this._pending.set(sessionId, sessionMap);
53+
54+
// eslint-disable-next-line prefer-const -- forward-referenced by cleanup() before assignment site
55+
let timer: ReturnType<typeof setTimeout> | undefined;
56+
const onAbort = () => {
57+
// Tell the client to stop processing, then reject locally.
58+
writeSSE({ jsonrpc: '2.0', method: 'notifications/cancelled', params: { requestId: id } });
59+
settle.reject(opts!.signal!.reason instanceof Error ? opts!.signal!.reason : new Error(String(opts!.signal!.reason)));
60+
};
61+
const cleanup = () => {
62+
if (timer !== undefined) clearTimeout(timer);
63+
sessionMap.delete(id);
64+
if (sessionMap.size === 0) this._pending.delete(sessionId);
65+
opts?.signal?.removeEventListener('abort', onAbort);
66+
};
67+
const settle = {
68+
resolve: (r: Result) => {
69+
cleanup();
70+
resolve(r);
71+
},
72+
reject: (e: Error) => {
73+
cleanup();
74+
reject(e);
75+
}
76+
};
77+
sessionMap.set(id, settle);
78+
79+
const timeoutMs = opts?.timeout ?? DEFAULT_REQUEST_TIMEOUT_MSEC;
80+
timer = setTimeout(
81+
() => settle.reject(new SdkError(SdkErrorCode.RequestTimeout, 'Request timed out', { timeout: timeoutMs })),
82+
timeoutMs
83+
);
84+
85+
opts?.signal?.addEventListener('abort', onAbort, { once: true });
86+
87+
const wire: JSONRPCRequest = { jsonrpc: '2.0', id, method: req.method, params: req.params };
88+
if (!writeSSE(wire)) {
89+
settle.reject(new SdkError(SdkErrorCode.SendFailed, 'Backchannel stream closed'));
90+
}
91+
});
92+
};
93+
}
94+
95+
/**
96+
* Routes an incoming JSON-RPC response (from a client POST) to the waiting `env.send` promise.
97+
* @returns true if a pending request matched and was settled.
98+
*/
99+
handleResponse(sessionId: string, response: JSONRPCResultResponse | JSONRPCErrorResponse): boolean {
100+
// We only mint numeric ids; a non-numeric id cannot be ours, so do not coerce
101+
// (string "0" must not claim numeric pending id 0).
102+
if (typeof response.id !== 'number') return false;
103+
const sessionMap = this._pending.get(sessionId);
104+
const settle = sessionMap?.get(response.id);
105+
if (!settle) return false;
106+
if (isJSONRPCErrorResponse(response)) {
107+
settle.reject(ProtocolError.fromError(response.error.code, response.error.message, response.error.data));
108+
} else {
109+
settle.resolve(response.result);
110+
}
111+
return true;
112+
}
113+
114+
/** Rejects all pending requests for a session and forgets it. */
115+
closeSession(sessionId: string): void {
116+
const sessionMap = this._pending.get(sessionId);
117+
if (!sessionMap) return;
118+
const err = new SdkError(SdkErrorCode.ConnectionClosed, 'Session closed');
119+
for (const s of sessionMap.values()) s.reject(err);
120+
this._pending.delete(sessionId);
121+
}
122+
}

packages/server/src/server/handleHttp.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,10 @@ export interface Dispatchable {
3333
*
3434
* `mcp.connect(transport)` is not called; each HTTP request flows through
3535
* `mcp.dispatch()` directly. Supply a `SessionCompat` via `options.session`
36-
* to serve clients that send `Mcp-Session-Id` (the pre-2026-06 stateful flow).
36+
* to serve clients that send `Mcp-Session-Id` (the pre-2026-06 stateful flow),
37+
* and a `BackchannelCompat` via `options.backchannel` to let handlers'
38+
* `ctx.mcpReq.send` (e.g. `elicitInput`, `requestSampling`) reach those
39+
* clients over the open POST SSE stream.
3740
*/
3841
export function handleHttp(
3942
mcp: Dispatchable,

packages/server/src/server/server.ts

Lines changed: 55 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import type {
2828
Result,
2929
ServerCapabilities,
3030
ServerContext,
31+
StandardSchemaV1,
3132
ToolResultContent,
3233
ToolUseContent
3334
} from '@modelcontextprotocol/core';
@@ -53,6 +54,13 @@ import { DefaultJsonSchemaValidator } from '@modelcontextprotocol/server/_shims'
5354

5455
import { ExperimentalServerTasks } from '../experimental/tasks/server.js';
5556

57+
/** Three-arg send used by `_createMessageVia`/`_elicitInputVia`; satisfied by both `_requestWithSchema` and `ctx.mcpReq.send`. */
58+
type SendWithSchema = <T extends StandardSchemaV1>(
59+
request: { method: string; params?: Record<string, unknown> },
60+
resultSchema: T,
61+
options?: RequestOptions
62+
) => Promise<StandardSchemaV1.InferOutput<T>>;
63+
5664
export type ServerOptions = ProtocolOptions & {
5765
/**
5866
* Capabilities to advertise as being supported by this server.
@@ -131,13 +139,18 @@ export class Server extends Protocol<ServerContext> {
131139
protected override buildContext(ctx: BaseContext, transportInfo?: MessageExtraInfo): ServerContext {
132140
// Only create http when there's actual HTTP transport info or auth info
133141
const hasHttpInfo = ctx.http || transportInfo?.request || transportInfo?.closeSSEStream || transportInfo?.closeStandaloneSSEStream;
142+
const sendOpts = (options?: RequestOptions): RequestOptions => ({ ...options, relatedRequestId: ctx.mcpReq.id });
134143
return {
135144
...ctx,
136145
mcpReq: {
137146
...ctx.mcpReq,
138-
log: (level, data, logger) => this.sendLoggingMessage({ level, data, logger }),
139-
elicitInput: (params, options) => this.elicitInput(params, options),
140-
requestSampling: (params, options) => this.createMessage(params, options)
147+
log: async (level, data, logger) => {
148+
if (this._capabilities.logging && !this.isMessageIgnored(level, ctx.sessionId)) {
149+
await ctx.mcpReq.notify({ method: 'notifications/message', params: { level, data, logger } });
150+
}
151+
},
152+
elicitInput: (params, options) => this._elicitInputVia(ctx.mcpReq.send, params, sendOpts(options)),
153+
requestSampling: (params, options) => this._createMessageVia(ctx.mcpReq.send, params, sendOpts(options))
141154
},
142155
http: hasHttpInfo
143156
? {
@@ -441,14 +454,31 @@ export class Server extends Protocol<ServerContext> {
441454
params: CreateMessageRequest['params'],
442455
options?: RequestOptions
443456
): Promise<CreateMessageResult | CreateMessageResultWithTools> {
444-
// Capability check - only required when tools/toolChoice are provided
445-
if ((params.tools || params.toolChoice) && !this._clientCapabilities?.sampling?.tools) {
457+
return this._createMessageVia((r, schema, opts) => this._requestWithSchema(r, schema, opts), params, options);
458+
}
459+
460+
/**
461+
* Shared body for {@linkcode createMessage} and `ctx.mcpReq.requestSampling`: capability check,
462+
* tool_use/tool_result pairing validation, and result-schema selection. The `send` argument
463+
* routes to either the connected driver (instance method) or `ctx.mcpReq.send` (per-request).
464+
*
465+
* NOTE: the capability check below reads `this._clientCapabilities`, which is a singleton
466+
* (set on the most recent `initialize`). For multi-session `handleHttp` deployments this
467+
* can read a different session's caps. The per-request `_meta.clientCapabilities` flow
468+
* fixes this in a follow-up; until then, do not rely on the cap gate for isolation.
469+
*/
470+
private async _createMessageVia(
471+
send: SendWithSchema,
472+
params: CreateMessageRequest['params'],
473+
options?: RequestOptions
474+
): Promise<CreateMessageResult | CreateMessageResultWithTools> {
475+
if (!this._clientCapabilities?.sampling) {
476+
throw new SdkError(SdkErrorCode.CapabilityNotSupported, 'Client does not support sampling capability.');
477+
}
478+
if ((params.tools || params.toolChoice) && !this._clientCapabilities.sampling.tools) {
446479
throw new SdkError(SdkErrorCode.CapabilityNotSupported, 'Client does not support sampling tools capability.');
447480
}
448481

449-
// Message structure validation - always validate tool_use/tool_result pairs.
450-
// These may appear even without tools/toolChoice in the current request when
451-
// a previous sampling request returned tool_use and this is a follow-up with results.
452482
if (params.messages.length > 0) {
453483
const lastMessage = params.messages.at(-1)!;
454484
const lastContent = Array.isArray(lastMessage.content) ? lastMessage.content : [lastMessage.content];
@@ -490,11 +520,10 @@ export class Server extends Protocol<ServerContext> {
490520
}
491521
}
492522

493-
// Use different schemas based on whether tools are provided
494523
if (params.tools) {
495-
return this._requestWithSchema({ method: 'sampling/createMessage', params }, CreateMessageResultWithToolsSchema, options);
524+
return send({ method: 'sampling/createMessage', params }, CreateMessageResultWithToolsSchema, options);
496525
}
497-
return this._requestWithSchema({ method: 'sampling/createMessage', params }, CreateMessageResultSchema, options);
526+
return send({ method: 'sampling/createMessage', params }, CreateMessageResultSchema, options);
498527
}
499528

500529
/**
@@ -505,46 +534,49 @@ export class Server extends Protocol<ServerContext> {
505534
* @returns The result of the elicitation request.
506535
*/
507536
async elicitInput(params: ElicitRequestFormParams | ElicitRequestURLParams, options?: RequestOptions): Promise<ElicitResult> {
537+
return this._elicitInputVia((r, schema, opts) => this._requestWithSchema(r, schema, opts), params, options);
538+
}
539+
540+
/**
541+
* Shared body for {@linkcode elicitInput} and `ctx.mcpReq.elicitInput`: form/url capability
542+
* sub-field check, mode defaulting, and post-receipt JSON-schema validation of `content`.
543+
*/
544+
private async _elicitInputVia(
545+
send: SendWithSchema,
546+
params: ElicitRequestFormParams | ElicitRequestURLParams,
547+
options?: RequestOptions
548+
): Promise<ElicitResult> {
508549
const mode = (params.mode ?? 'form') as 'form' | 'url';
509550

510551
switch (mode) {
511552
case 'url': {
512553
if (!this._clientCapabilities?.elicitation?.url) {
513554
throw new SdkError(SdkErrorCode.CapabilityNotSupported, 'Client does not support url elicitation.');
514555
}
515-
516556
const urlParams = params as ElicitRequestURLParams;
517-
return this._requestWithSchema({ method: 'elicitation/create', params: urlParams }, ElicitResultSchema, options);
557+
return send({ method: 'elicitation/create', params: urlParams }, ElicitResultSchema, options);
518558
}
519559
case 'form': {
520560
if (!this._clientCapabilities?.elicitation?.form) {
521561
throw new SdkError(SdkErrorCode.CapabilityNotSupported, 'Client does not support form elicitation.');
522562
}
523-
524563
const formParams: ElicitRequestFormParams =
525564
params.mode === 'form' ? (params as ElicitRequestFormParams) : { ...(params as ElicitRequestFormParams), mode: 'form' };
526565

527-
const result = await this._requestWithSchema(
528-
{ method: 'elicitation/create', params: formParams },
529-
ElicitResultSchema,
530-
options
531-
);
566+
const result = await send({ method: 'elicitation/create', params: formParams }, ElicitResultSchema, options);
532567

533568
if (result.action === 'accept' && result.content && formParams.requestedSchema) {
534569
try {
535570
const validator = this._jsonSchemaValidator.getValidator(formParams.requestedSchema as JsonSchemaType);
536571
const validationResult = validator(result.content);
537-
538572
if (!validationResult.valid) {
539573
throw new ProtocolError(
540574
ProtocolErrorCode.InvalidParams,
541575
`Elicitation response content does not match requested schema: ${validationResult.errorMessage}`
542576
);
543577
}
544578
} catch (error) {
545-
if (error instanceof ProtocolError) {
546-
throw error;
547-
}
579+
if (error instanceof ProtocolError) throw error;
548580
throw new ProtocolError(
549581
ProtocolErrorCode.InternalError,
550582
`Error validating elicitation response: ${error instanceof Error ? error.message : String(error)}`

packages/server/src/server/shttpHandler.ts

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import {
1919
SUPPORTED_PROTOCOL_VERSIONS
2020
} from '@modelcontextprotocol/core';
2121

22+
import type { BackchannelCompat } from './backchannelCompat.js';
2223
import type { SessionCompat } from './sessionCompat.js';
2324
import type { EventId, EventStore } from './streamableHttp.js';
2425

@@ -54,6 +55,15 @@ export interface ShttpHandlerOptions {
5455
*/
5556
session?: SessionCompat;
5657

58+
/**
59+
* Pre-2026-06 server-to-client request backchannel. When provided alongside `session`,
60+
* a handler's `ctx.mcpReq.send` (e.g. `elicitInput`, `requestSampling`) writes the
61+
* outbound request onto the open POST's SSE stream and the client's POSTed-back
62+
* response resolves the awaited promise. When omitted, `ctx.mcpReq.send` rejects
63+
* with `NotConnected` on this path.
64+
*/
65+
backchannel?: BackchannelCompat;
66+
5767
/**
5868
* Event store for SSE resumability via `Last-Event-ID`. When configured, every
5969
* outgoing SSE event is persisted and a priming event is sent at stream start.
@@ -145,6 +155,8 @@ const SSE_HEADERS: Record<string, string> = {
145155
Connection: 'keep-alive'
146156
};
147157

158+
const wiredBackchannels = new WeakMap<SessionCompat, WeakSet<BackchannelCompat>>();
159+
148160
/**
149161
* Creates a Web-standard `(Request) => Promise<Response>` handler for the MCP Streamable HTTP
150162
* transport, driven by {@linkcode ShttpCallbacks.onrequest} per request.
@@ -161,11 +173,24 @@ export function shttpHandler(
161173
): (req: Request, extra?: ShttpRequestExtra) => Promise<Response> {
162174
const enableJsonResponse = options.enableJsonResponse ?? false;
163175
const session = options.session;
176+
const backchannel = options.backchannel;
164177
const eventStore = options.eventStore;
165178
const retryInterval = options.retryInterval;
166179
const supportedProtocolVersions = options.supportedProtocolVersions ?? SUPPORTED_PROTOCOL_VERSIONS;
167180
const onerror = options.onerror;
168181

182+
// Reject any pending backchannel sends on session close (DELETE, idle/capacity eviction).
183+
// Guard so repeated shttpHandler()/handleHttp() calls on the same session+backchannel
184+
// pair do not accumulate listeners.
185+
if (session && backchannel) {
186+
let wired = wiredBackchannels.get(session);
187+
if (!wired) wiredBackchannels.set(session, (wired = new WeakSet()));
188+
if (!wired.has(backchannel)) {
189+
wired.add(backchannel);
190+
session.addCloseListener(sid => backchannel.closeSession(sid));
191+
}
192+
}
193+
169194
/**
170195
* Per-request abort controllers for `notifications/cancelled`. Keyed by
171196
* `(sessionId, requestId)` so concurrent sessions reusing the same JSON-RPC id don't collide.
@@ -283,7 +308,8 @@ export function shttpHandler(
283308
}
284309

285310
for (const r of responses) {
286-
if (cb.onresponse?.(r) === false) {
311+
const claimedByBackchannel = backchannel && sessionId !== undefined && backchannel.handleResponse(sessionId, r);
312+
if (!claimedByBackchannel && cb.onresponse?.(r) === false) {
287313
onerror?.(new Error(`Unclaimed JSON-RPC response (id=${String(r.id)}); no pending server-initiated request matched.`));
288314
}
289315
}
@@ -359,7 +385,14 @@ export function shttpHandler(
359385
closeStandaloneSSEStream:
360386
supportsPolling && sessionId !== undefined ? () => session?.closeStandaloneStream(sessionId) : undefined
361387
};
362-
const env: ShttpRequestEnv = { ...baseEnv, _transportExtra: transportExtra };
388+
// Backchannel writes go straight to writeSSEEvent (synchronous boolean) so a closed
389+
// stream surfaces as `false` immediately instead of hanging until the timeout.
390+
const writeSSE = (msg: JSONRPCMessage): boolean => writeSSEEvent(controller, encoder, msg);
391+
const env: ShttpRequestEnv = {
392+
...baseEnv,
393+
_transportExtra: transportExtra,
394+
...(backchannel && sessionId !== undefined ? { send: backchannel.makeEnvSend(sessionId, writeSSE) } : {})
395+
};
363396
void (async () => {
364397
try {
365398
await writePrimingEvent(controller, encoder, streamId, clientProtocolVersion);

0 commit comments

Comments
 (0)