Skip to content

Commit 3eebc7f

Browse files
feat(server): BackchannelCompat; Server.buildContext outbound via ctx.mcpReq.send
BackchannelCompat: opt-in 2025-11 server-to-client backchannel for handleHttp. A handler's elicitInput/requestSampling writes onto the open POST's SSE stream and the client's POSTed-back response resolves the await. Server.buildContext factors elicitInput/requestSampling into _createMessageVia/_elicitInputVia so instance methods and ctx.mcpReq share the same validation and capability check (capability check reads instance _clientCapabilities; per-request flow in F1).
1 parent 7758dfc commit 3eebc7f

8 files changed

Lines changed: 340 additions & 34 deletions

File tree

packages/server/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ export { Server } from './server/server.js';
3131
// StdioServerTransport is exported from the './stdio' subpath — server stdio has only type-level Node
3232
// imports (erased at compile time), but matching the client's `./stdio` subpath gives consumers a
3333
// consistent shape across packages.
34+
export { BackchannelCompat } from './server/backchannelCompat.js';
3435
export type { Dispatchable, HandleHttpOptions, HandleHttpRequestExtra } from './server/handleHttp.js';
3536
export { handleHttp } from './server/handleHttp.js';
3637
export type { SessionCompatOptions, SessionValidation } from './server/sessionCompat.js';
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
import type {
2+
JSONRPCErrorResponse,
3+
JSONRPCMessage,
4+
JSONRPCRequest,
5+
JSONRPCResultResponse,
6+
Request,
7+
RequestOptions,
8+
Result
9+
} from '@modelcontextprotocol/core';
10+
import { DEFAULT_REQUEST_TIMEOUT_MSEC, isJSONRPCErrorResponse, ProtocolError, SdkError, SdkErrorCode } from '@modelcontextprotocol/core';
11+
12+
/**
13+
* Isolated 2025-11 server-to-client request backchannel for `handleHttp`.
14+
*
15+
* The 2025-11 protocol allows a server to send `elicitation/create` and
16+
* `sampling/createMessage` requests to the client mid-tool-call by writing them as
17+
* SSE events on the open POST response stream and waiting for the client to POST
18+
* the response back. This class owns the per-session `{requestId -> resolver}`
19+
* map that correlation requires.
20+
*
21+
* It exists so this stateful behaviour is in one removable file once MRTR
22+
* (SEP-2322) is the protocol floor and `env.send` becomes a hard error in
23+
* stateless paths.
24+
*/
25+
export class BackchannelCompat {
26+
private _pending = new Map<string, Map<number, { resolve: (r: Result) => void; reject: (e: Error) => void }>>();
27+
private _nextId = 0;
28+
29+
/**
30+
* Returns an `env.send` implementation bound to the given session and POST-stream writer.
31+
* The returned function writes the outbound JSON-RPC request to `writeSSE` and resolves when
32+
* {@linkcode handleResponse} is called for the same id on the same session.
33+
*
34+
* `writeSSE` returns `false` when the underlying stream is closed; the returned promise then
35+
* rejects immediately with `SendFailed` instead of waiting for the timeout.
36+
*
37+
* Backchannel writes are not persisted to the {@linkcode EventStore}, so a client that
38+
* disconnects mid-elicitation and resumes via `Last-Event-ID` will not see the outbound
39+
* request again; the awaiting handler will time out. This is a known limitation of the
40+
* legacy backchannel path; SEP-2322 (`ContinuationCompat`) is the resumable alternative.
41+
*/
42+
makeEnvSend(sessionId: string, writeSSE: (msg: JSONRPCMessage) => boolean): (req: Request, opts?: RequestOptions) => Promise<Result> {
43+
return (req: Request, opts?: RequestOptions): Promise<Result> => {
44+
return new Promise<Result>((resolve, reject) => {
45+
if (opts?.signal?.aborted) {
46+
reject(opts.signal.reason instanceof Error ? opts.signal.reason : new Error(String(opts.signal.reason)));
47+
return;
48+
}
49+
50+
const id = this._nextId++;
51+
const sessionMap = this._pending.get(sessionId) ?? new Map();
52+
this._pending.set(sessionId, sessionMap);
53+
54+
// eslint-disable-next-line prefer-const -- forward-referenced by cleanup() before assignment site
55+
let timer: ReturnType<typeof setTimeout> | undefined;
56+
const onAbort = () => {
57+
// Tell the client to stop processing, then reject locally.
58+
writeSSE({ jsonrpc: '2.0', method: 'notifications/cancelled', params: { requestId: id } });
59+
settle.reject(opts!.signal!.reason instanceof Error ? opts!.signal!.reason : new Error(String(opts!.signal!.reason)));
60+
};
61+
const cleanup = () => {
62+
if (timer !== undefined) clearTimeout(timer);
63+
sessionMap.delete(id);
64+
if (sessionMap.size === 0) this._pending.delete(sessionId);
65+
opts?.signal?.removeEventListener('abort', onAbort);
66+
};
67+
const settle = {
68+
resolve: (r: Result) => {
69+
cleanup();
70+
resolve(r);
71+
},
72+
reject: (e: Error) => {
73+
cleanup();
74+
reject(e);
75+
}
76+
};
77+
sessionMap.set(id, settle);
78+
79+
const timeoutMs = opts?.timeout ?? DEFAULT_REQUEST_TIMEOUT_MSEC;
80+
timer = setTimeout(
81+
() => settle.reject(new SdkError(SdkErrorCode.RequestTimeout, 'Request timed out', { timeout: timeoutMs })),
82+
timeoutMs
83+
);
84+
85+
opts?.signal?.addEventListener('abort', onAbort, { once: true });
86+
87+
const wire: JSONRPCRequest = { jsonrpc: '2.0', id, method: req.method, params: req.params };
88+
if (!writeSSE(wire)) {
89+
settle.reject(new SdkError(SdkErrorCode.SendFailed, 'Backchannel stream closed'));
90+
}
91+
});
92+
};
93+
}
94+
95+
/**
96+
* Routes an incoming JSON-RPC response (from a client POST) to the waiting `env.send` promise.
97+
* @returns true if a pending request matched and was settled.
98+
*/
99+
handleResponse(sessionId: string, response: JSONRPCResultResponse | JSONRPCErrorResponse): boolean {
100+
// We only mint numeric ids; a non-numeric id cannot be ours, so do not coerce
101+
// (string "0" must not claim numeric pending id 0).
102+
if (typeof response.id !== 'number') return false;
103+
const sessionMap = this._pending.get(sessionId);
104+
const settle = sessionMap?.get(response.id);
105+
if (!settle) return false;
106+
if (isJSONRPCErrorResponse(response)) {
107+
settle.reject(ProtocolError.fromError(response.error.code, response.error.message, response.error.data));
108+
} else {
109+
settle.resolve(response.result);
110+
}
111+
return true;
112+
}
113+
114+
/** Rejects all pending requests for a session and forgets it. */
115+
closeSession(sessionId: string): void {
116+
const sessionMap = this._pending.get(sessionId);
117+
if (!sessionMap) return;
118+
const err = new SdkError(SdkErrorCode.ConnectionClosed, 'Session closed');
119+
for (const s of sessionMap.values()) s.reject(err);
120+
this._pending.delete(sessionId);
121+
}
122+
}

packages/server/src/server/handleHttp.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,10 @@ export interface Dispatchable {
3333
*
3434
* `mcp.connect(transport)` is not called; each HTTP request flows through
3535
* `mcp.dispatch()` directly. Supply a `SessionCompat` via `options.session`
36-
* to serve clients that send `Mcp-Session-Id` (the pre-2026-06 stateful flow).
36+
* to serve clients that send `Mcp-Session-Id` (the pre-2026-06 stateful flow),
37+
* and a `BackchannelCompat` via `options.backchannel` to let handlers'
38+
* `ctx.mcpReq.send` (e.g. `elicitInput`, `requestSampling`) reach those
39+
* clients over the open POST SSE stream.
3740
*/
3841
export function handleHttp(
3942
mcp: Dispatchable,

packages/server/src/server/server.ts

Lines changed: 55 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import type {
2828
Result,
2929
ServerCapabilities,
3030
ServerContext,
31+
StandardSchemaV1,
3132
TaskManagerOptions,
3233
ToolResultContent,
3334
ToolUseContent
@@ -54,6 +55,13 @@ import { DefaultJsonSchemaValidator } from '@modelcontextprotocol/server/_shims'
5455

5556
import { ExperimentalServerTasks } from '../experimental/tasks/server.js';
5657

58+
/** Three-arg send used by `_createMessageVia`/`_elicitInputVia`; satisfied by both `_requestWithSchema` and `ctx.mcpReq.send`. */
59+
type SendWithSchema = <T extends StandardSchemaV1>(
60+
request: { method: string; params?: Record<string, unknown> },
61+
resultSchema: T,
62+
options?: RequestOptions
63+
) => Promise<StandardSchemaV1.InferOutput<T>>;
64+
5765
/**
5866
* Extended tasks capability that includes runtime configuration (store, messageQueue).
5967
* The runtime-only fields are stripped before advertising capabilities to clients.
@@ -148,13 +156,18 @@ export class Server extends Protocol<ServerContext> {
148156
protected override buildContext(ctx: BaseContext, transportInfo?: MessageExtraInfo): ServerContext {
149157
// Only create http when there's actual HTTP transport info or auth info
150158
const hasHttpInfo = ctx.http || transportInfo?.request || transportInfo?.closeSSEStream || transportInfo?.closeStandaloneSSEStream;
159+
const sendOpts = (options?: RequestOptions): RequestOptions => ({ ...options, relatedRequestId: ctx.mcpReq.id });
151160
return {
152161
...ctx,
153162
mcpReq: {
154163
...ctx.mcpReq,
155-
log: (level, data, logger) => this.sendLoggingMessage({ level, data, logger }),
156-
elicitInput: (params, options) => this.elicitInput(params, options),
157-
requestSampling: (params, options) => this.createMessage(params, options)
164+
log: async (level, data, logger) => {
165+
if (this._capabilities.logging && !this.isMessageIgnored(level, ctx.sessionId)) {
166+
await ctx.mcpReq.notify({ method: 'notifications/message', params: { level, data, logger } });
167+
}
168+
},
169+
elicitInput: (params, options) => this._elicitInputVia(ctx.mcpReq.send, params, sendOpts(options)),
170+
requestSampling: (params, options) => this._createMessageVia(ctx.mcpReq.send, params, sendOpts(options))
158171
},
159172
http: hasHttpInfo
160173
? {
@@ -458,14 +471,31 @@ export class Server extends Protocol<ServerContext> {
458471
params: CreateMessageRequest['params'],
459472
options?: RequestOptions
460473
): Promise<CreateMessageResult | CreateMessageResultWithTools> {
461-
// Capability check - only required when tools/toolChoice are provided
462-
if ((params.tools || params.toolChoice) && !this._clientCapabilities?.sampling?.tools) {
474+
return this._createMessageVia((r, schema, opts) => this._requestWithSchema(r, schema, opts), params, options);
475+
}
476+
477+
/**
478+
* Shared body for {@linkcode createMessage} and `ctx.mcpReq.requestSampling`: capability check,
479+
* tool_use/tool_result pairing validation, and result-schema selection. The `send` argument
480+
* routes to either the connected driver (instance method) or `ctx.mcpReq.send` (per-request).
481+
*
482+
* NOTE: the capability check below reads `this._clientCapabilities`, which is a singleton
483+
* (set on the most recent `initialize`). For multi-session `handleHttp` deployments this
484+
* can read a different session's caps. The per-request `_meta.clientCapabilities` flow
485+
* fixes this in a follow-up; until then, do not rely on the cap gate for isolation.
486+
*/
487+
private async _createMessageVia(
488+
send: SendWithSchema,
489+
params: CreateMessageRequest['params'],
490+
options?: RequestOptions
491+
): Promise<CreateMessageResult | CreateMessageResultWithTools> {
492+
if (!this._clientCapabilities?.sampling) {
493+
throw new SdkError(SdkErrorCode.CapabilityNotSupported, 'Client does not support sampling capability.');
494+
}
495+
if ((params.tools || params.toolChoice) && !this._clientCapabilities.sampling.tools) {
463496
throw new SdkError(SdkErrorCode.CapabilityNotSupported, 'Client does not support sampling tools capability.');
464497
}
465498

466-
// Message structure validation - always validate tool_use/tool_result pairs.
467-
// These may appear even without tools/toolChoice in the current request when
468-
// a previous sampling request returned tool_use and this is a follow-up with results.
469499
if (params.messages.length > 0) {
470500
const lastMessage = params.messages.at(-1)!;
471501
const lastContent = Array.isArray(lastMessage.content) ? lastMessage.content : [lastMessage.content];
@@ -507,11 +537,10 @@ export class Server extends Protocol<ServerContext> {
507537
}
508538
}
509539

510-
// Use different schemas based on whether tools are provided
511540
if (params.tools) {
512-
return this._requestWithSchema({ method: 'sampling/createMessage', params }, CreateMessageResultWithToolsSchema, options);
541+
return send({ method: 'sampling/createMessage', params }, CreateMessageResultWithToolsSchema, options);
513542
}
514-
return this._requestWithSchema({ method: 'sampling/createMessage', params }, CreateMessageResultSchema, options);
543+
return send({ method: 'sampling/createMessage', params }, CreateMessageResultSchema, options);
515544
}
516545

517546
/**
@@ -522,46 +551,49 @@ export class Server extends Protocol<ServerContext> {
522551
* @returns The result of the elicitation request.
523552
*/
524553
async elicitInput(params: ElicitRequestFormParams | ElicitRequestURLParams, options?: RequestOptions): Promise<ElicitResult> {
554+
return this._elicitInputVia((r, schema, opts) => this._requestWithSchema(r, schema, opts), params, options);
555+
}
556+
557+
/**
558+
* Shared body for {@linkcode elicitInput} and `ctx.mcpReq.elicitInput`: form/url capability
559+
* sub-field check, mode defaulting, and post-receipt JSON-schema validation of `content`.
560+
*/
561+
private async _elicitInputVia(
562+
send: SendWithSchema,
563+
params: ElicitRequestFormParams | ElicitRequestURLParams,
564+
options?: RequestOptions
565+
): Promise<ElicitResult> {
525566
const mode = (params.mode ?? 'form') as 'form' | 'url';
526567

527568
switch (mode) {
528569
case 'url': {
529570
if (!this._clientCapabilities?.elicitation?.url) {
530571
throw new SdkError(SdkErrorCode.CapabilityNotSupported, 'Client does not support url elicitation.');
531572
}
532-
533573
const urlParams = params as ElicitRequestURLParams;
534-
return this._requestWithSchema({ method: 'elicitation/create', params: urlParams }, ElicitResultSchema, options);
574+
return send({ method: 'elicitation/create', params: urlParams }, ElicitResultSchema, options);
535575
}
536576
case 'form': {
537577
if (!this._clientCapabilities?.elicitation?.form) {
538578
throw new SdkError(SdkErrorCode.CapabilityNotSupported, 'Client does not support form elicitation.');
539579
}
540-
541580
const formParams: ElicitRequestFormParams =
542581
params.mode === 'form' ? (params as ElicitRequestFormParams) : { ...(params as ElicitRequestFormParams), mode: 'form' };
543582

544-
const result = await this._requestWithSchema(
545-
{ method: 'elicitation/create', params: formParams },
546-
ElicitResultSchema,
547-
options
548-
);
583+
const result = await send({ method: 'elicitation/create', params: formParams }, ElicitResultSchema, options);
549584

550585
if (result.action === 'accept' && result.content && formParams.requestedSchema) {
551586
try {
552587
const validator = this._jsonSchemaValidator.getValidator(formParams.requestedSchema as JsonSchemaType);
553588
const validationResult = validator(result.content);
554-
555589
if (!validationResult.valid) {
556590
throw new ProtocolError(
557591
ProtocolErrorCode.InvalidParams,
558592
`Elicitation response content does not match requested schema: ${validationResult.errorMessage}`
559593
);
560594
}
561595
} catch (error) {
562-
if (error instanceof ProtocolError) {
563-
throw error;
564-
}
596+
if (error instanceof ProtocolError) throw error;
565597
throw new ProtocolError(
566598
ProtocolErrorCode.InternalError,
567599
`Error validating elicitation response: ${error instanceof Error ? error.message : String(error)}`

0 commit comments

Comments
 (0)