Skip to content

Commit 7921183

Browse files
feat(client): MRTR retry loop, ClientTransport interface, version-gate notifications/initialized (SEP-2322/2575)
1 parent c1b00ac commit 7921183

4 files changed

Lines changed: 531 additions & 6 deletions

File tree

packages/client/src/client/client.ts

Lines changed: 246 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@ import type {
99
CompleteRequest,
1010
GetPromptRequest,
1111
Implementation,
12+
IncompleteResult,
13+
JSONRPCErrorResponse,
1214
JSONRPCRequest,
15+
JSONRPCResultResponse,
1316
JsonSchemaType,
1417
JsonSchemaValidator,
1518
jsonSchemaValidator,
@@ -22,12 +25,15 @@ import type {
2225
LoggingLevel,
2326
MessageExtraInfo,
2427
NotificationMethod,
28+
NotificationOptions,
2529
ProtocolOptions,
2630
ReadResourceRequest,
31+
Request,
2732
RequestMethod,
2833
RequestOptions,
2934
Result,
3035
ServerCapabilities,
36+
StandardSchemaV1,
3137
SubscribeRequest,
3238
Tool,
3339
Transport,
@@ -44,6 +50,7 @@ import {
4450
EmptyResultSchema,
4551
GetPromptResultSchema,
4652
InitializeResultSchema,
53+
isJSONRPCErrorResponse,
4754
LATEST_PROTOCOL_VERSION,
4855
ListChangedOptionsBaseSchema,
4956
ListPromptsResultSchema,
@@ -55,12 +62,16 @@ import {
5562
Protocol,
5663
ProtocolError,
5764
ProtocolErrorCode,
65+
RAW_RESULT_SCHEMA,
5866
ReadResourceResultSchema,
5967
SdkError,
60-
SdkErrorCode
68+
SdkErrorCode,
69+
validateStandardSchema
6170
} from '@modelcontextprotocol/core';
6271

6372
import { ExperimentalClientTasks } from '../experimental/tasks/client.js';
73+
import type { ClientTransport } from './clientTransport.js';
74+
import { isClientTransport } from './clientTransport.js';
6475

6576
/**
6677
* Elicitation default application helper. Applies defaults to the `data` based on the `schema`.
@@ -179,8 +190,29 @@ export type ClientOptions = ProtocolOptions & {
179190
* ```
180191
*/
181192
listChanged?: ListChangedHandlers;
193+
194+
/**
195+
* SEP-2322: maximum number of incomplete-result rounds before failing. Each round
196+
* services the server's `inputRequests` via local handlers and re-sends with
197+
* `params.{inputResponses, requestState}`. Prevents unbounded looping on a server
198+
* that returns `incomplete` forever.
199+
*
200+
* @default 16
201+
*/
202+
mrtrMaxRounds?: number;
182203
};
183204

205+
const DEFAULT_MRTR_MAX_ROUNDS = 16;
206+
207+
/** SEP-2322 client-side detection. The server signals it cannot complete without client input. */
208+
function isIncompleteResult(r: unknown): r is IncompleteResult {
209+
if (typeof r !== 'object' || r === null) return false;
210+
const o = r as { resultType?: unknown; inputRequests?: unknown };
211+
if (o.resultType !== 'incomplete') return false;
212+
if (o.inputRequests === undefined) return true;
213+
return typeof o.inputRequests === 'object' && o.inputRequests !== null && !Array.isArray(o.inputRequests);
214+
}
215+
184216
/**
185217
* An MCP client on top of a pluggable transport.
186218
*
@@ -223,6 +255,12 @@ export class Client extends Protocol<ClientContext> {
223255
private _listChangedDebounceTimers: Map<string, ReturnType<typeof setTimeout>> = new Map();
224256
private _pendingListChangedConfig?: ListChangedHandlers;
225257
private _enforceStrictCapabilities: boolean;
258+
/** Set when {@linkcode connect} was given a request-shaped {@linkcode ClientTransport}. */
259+
private _clientTransport?: ClientTransport;
260+
/** Monotonic id counter for the request-shaped path. */
261+
private _ctRequestId = 0;
262+
/** SEP-2322: maximum incomplete-result rounds before failing. */
263+
private readonly _mrtrMaxRounds: number;
226264

227265
/**
228266
* Initializes this client with the given name and version information.
@@ -235,6 +273,7 @@ export class Client extends Protocol<ClientContext> {
235273
this._capabilities = options?.capabilities ? { ...options.capabilities } : {};
236274
this._jsonSchemaValidator = options?.jsonSchemaValidator ?? new DefaultJsonSchemaValidator();
237275
this._enforceStrictCapabilities = options?.enforceStrictCapabilities ?? false;
276+
this._mrtrMaxRounds = options?.mrtrMaxRounds ?? DEFAULT_MRTR_MAX_ROUNDS;
238277

239278
// Store list changed config for setup after connection (when we know server capabilities)
240279
if (options?.listChanged) {
@@ -297,7 +336,7 @@ export class Client extends Protocol<ClientContext> {
297336
* The new capabilities will be merged with any existing capabilities previously given (e.g., at initialization).
298337
*/
299338
public registerCapabilities(capabilities: ClientCapabilities): void {
300-
if (this.transport) {
339+
if (this.transport || this._clientTransport) {
301340
throw new Error('Cannot register capabilities after connecting to transport');
302341
}
303342

@@ -429,7 +468,27 @@ export class Client extends Protocol<ClientContext> {
429468
* }
430469
* ```
431470
*/
432-
override async connect(transport: Transport, options?: RequestOptions): Promise<void> {
471+
override async connect(transport: Transport | ClientTransport, options?: RequestOptions): Promise<void> {
472+
if (isClientTransport(transport)) {
473+
// Request-shaped transport: bypass Protocol's pipe wiring. _requestWithSchema
474+
// routes via _clientTransport.fetch() per call; inbound server requests are
475+
// serviced via _serviceInputRequests over the SEP-2322 retry loop.
476+
this._clientTransport = transport;
477+
await this._initializeOrDiscover(options);
478+
// Wire the optional unsolicited stream so list-changed handlers can fire between fetches.
479+
if (transport.subscribe) {
480+
void (async () => {
481+
try {
482+
for await (const n of transport.subscribe!({ onrequest: r => this._serviceInboundRequest(r) })) {
483+
await this.dispatchNotification(n).catch(error => this.onerror?.(error as Error));
484+
}
485+
} catch (error) {
486+
this.onerror?.(error as Error);
487+
}
488+
})();
489+
}
490+
return;
491+
}
433492
await super.connect(transport);
434493
// When transport sessionId is already set this means we are trying to reconnect.
435494
// Restore the protocol version negotiated during the original initialize handshake
@@ -472,9 +531,14 @@ export class Client extends Protocol<ClientContext> {
472531

473532
this._instructions = result.instructions;
474533

475-
await this.notification({
476-
method: 'notifications/initialized'
477-
});
534+
// SEP-2575: post-initialize handshake notification only on legacy protocol
535+
// versions. 2026-06-30+ servers do not require it (initialize itself is
536+
// optional there); skipping avoids an extra round-trip on stateless servers.
537+
if (this._negotiatedProtocolVersion < '2026-06-30') {
538+
await this.notification({
539+
method: 'notifications/initialized'
540+
});
541+
}
478542

479543
// Set up list changed handlers now that we know server capabilities
480544
if (this._pendingListChangedConfig) {
@@ -997,4 +1061,180 @@ export class Client extends Protocol<ClientContext> {
9971061
async sendRootsListChanged() {
9981062
return this.notification({ method: 'notifications/roots/list_changed' });
9991063
}
1064+
1065+
/**
1066+
* Runs the SEP-2322 multi-round-trip retry loop around outbound requests, then validates
1067+
* the final result against `resultSchema`. On the pipe-transport path it delegates each
1068+
* round to `super._requestWithSchema(req, RAW_RESULT_SCHEMA)`; on the request-shaped path
1069+
* it calls {@linkcode ClientTransport.fetch}.
1070+
*/
1071+
protected override async _requestWithSchema<T extends StandardSchemaV1>(
1072+
req: Request,
1073+
resultSchema: T,
1074+
options?: RequestOptions
1075+
): Promise<StandardSchemaV1.InferOutput<T>> {
1076+
const overallStart = Date.now();
1077+
let inputResponses: Record<string, unknown> | undefined;
1078+
let requestState: string | undefined;
1079+
for (let round = 0; round < this._mrtrMaxRounds; round++) {
1080+
const remainingTotal =
1081+
options?.maxTotalTimeout === undefined ? undefined : Math.max(0, options.maxTotalTimeout - (Date.now() - overallStart));
1082+
if (remainingTotal !== undefined && remainingTotal <= 0) {
1083+
throw new SdkError(SdkErrorCode.RequestTimeout, 'Request timed out', { maxTotalTimeout: options?.maxTotalTimeout });
1084+
}
1085+
const params =
1086+
round > 0
1087+
? { ...(req.params as Record<string, unknown> | undefined), inputResponses, requestState }
1088+
: (req.params as Record<string, unknown> | undefined);
1089+
const roundOpts: RequestOptions = {
1090+
...options,
1091+
maxTotalTimeout: remainingTotal,
1092+
// resumptionToken is only meaningful for round 0; subsequent rounds carry
1093+
// fresh inputResponses in the body and must POST, not resume the prior stream.
1094+
resumptionToken: round === 0 ? options?.resumptionToken : undefined
1095+
};
1096+
const raw = await this._requestRoundRaw({ method: req.method, params }, roundOpts);
1097+
if (isIncompleteResult(raw)) {
1098+
requestState = raw.requestState;
1099+
if (raw.inputRequests) {
1100+
inputResponses = { ...inputResponses, ...(await this._serviceInputRequests(raw.inputRequests, options?.signal)) };
1101+
}
1102+
continue;
1103+
}
1104+
const v = await validateStandardSchema(resultSchema, raw);
1105+
if (!v.success) {
1106+
throw new SdkError(SdkErrorCode.InvalidResult, `Invalid ${req.method} result: ${v.error}`);
1107+
}
1108+
return v.data;
1109+
}
1110+
throw new ProtocolError(ProtocolErrorCode.InternalError, `MRTR exceeded ${this._mrtrMaxRounds} rounds for ${req.method}`);
1111+
}
1112+
1113+
/** One round of {@linkcode _requestWithSchema}: returns the raw (unvalidated) result. */
1114+
private async _requestRoundRaw(req: Request, options?: RequestOptions): Promise<unknown> {
1115+
if (this._clientTransport) {
1116+
if (this._enforceStrictCapabilities) this.assertCapabilityForMethod(req.method);
1117+
const id = this._ctRequestId++;
1118+
const meta = {
1119+
...(req.params?._meta as Record<string, unknown> | undefined),
1120+
...(options?.onprogress ? { progressToken: id } : {})
1121+
};
1122+
const params: Record<string, unknown> | undefined =
1123+
req.params || Object.keys(meta).length > 0
1124+
? { ...(req.params as Record<string, unknown> | undefined), _meta: Object.keys(meta).length > 0 ? meta : undefined }
1125+
: undefined;
1126+
const resp = await this._clientTransport.fetch(
1127+
{ jsonrpc: '2.0', id, method: req.method, params },
1128+
{
1129+
signal: options?.signal,
1130+
onprogress: options?.onprogress,
1131+
timeout: options?.timeout,
1132+
resetTimeoutOnProgress: options?.resetTimeoutOnProgress,
1133+
maxTotalTimeout: options?.maxTotalTimeout,
1134+
relatedRequestId: options?.relatedRequestId,
1135+
resumptionToken: options?.resumptionToken,
1136+
onresumptiontoken: options?.onresumptiontoken,
1137+
onnotification: n => void this.dispatchNotification(n).catch(error => this.onerror?.(error as Error)),
1138+
onrequest: r => this._serviceInboundRequest(r, options?.signal)
1139+
}
1140+
);
1141+
if (isJSONRPCErrorResponse(resp)) {
1142+
throw ProtocolError.fromError(resp.error.code, resp.error.message, resp.error.data);
1143+
}
1144+
return resp.result;
1145+
}
1146+
return super._requestWithSchema(req, RAW_RESULT_SCHEMA, options);
1147+
}
1148+
1149+
/**
1150+
* Service the SEP-2322 `inputRequests` by dispatching each as a synthetic inbound
1151+
* request through this client's own handler registry (sampling/elicitation/roots).
1152+
* Handler errors propagate (aborting the MRTR round) so the server is not retried
1153+
* with a partial response set.
1154+
*/
1155+
private async _serviceInputRequests(
1156+
reqs: NonNullable<IncompleteResult['inputRequests']>,
1157+
signal?: AbortSignal
1158+
): Promise<Record<string, unknown>> {
1159+
const out: Record<string, unknown> = {};
1160+
for (const [key, ir] of Object.entries(reqs)) {
1161+
signal?.throwIfAborted();
1162+
const resp = await this._serviceInboundRequest(
1163+
{ jsonrpc: '2.0', id: `mrtr:${key}`, method: ir.method, params: ir.params },
1164+
signal
1165+
);
1166+
if (isJSONRPCErrorResponse(resp)) {
1167+
throw ProtocolError.fromError(resp.error.code, resp.error.message, resp.error.data);
1168+
}
1169+
out[key] = resp.result;
1170+
}
1171+
return out;
1172+
}
1173+
1174+
/** Dispatch one server-initiated request through this client's handler registry. */
1175+
private async _serviceInboundRequest(r: JSONRPCRequest, signal?: AbortSignal): Promise<JSONRPCResultResponse | JSONRPCErrorResponse> {
1176+
let final: JSONRPCResultResponse | JSONRPCErrorResponse | undefined;
1177+
for await (const out of this.dispatch(r, { signal })) {
1178+
if (out.kind === 'response') final = out.message as JSONRPCResultResponse | JSONRPCErrorResponse;
1179+
}
1180+
if (!final) {
1181+
return { jsonrpc: '2.0', id: r.id, error: { code: ProtocolErrorCode.InternalError, message: 'dispatch yielded no response' } };
1182+
}
1183+
return final;
1184+
}
1185+
1186+
/** Initialize handshake on the request-shaped {@linkcode ClientTransport} path. */
1187+
private async _initializeOrDiscover(options?: RequestOptions): Promise<void> {
1188+
try {
1189+
const result = await this._requestWithSchema(
1190+
{
1191+
method: 'initialize',
1192+
params: {
1193+
protocolVersion: this._supportedProtocolVersions[0] ?? LATEST_PROTOCOL_VERSION,
1194+
capabilities: this._capabilities,
1195+
clientInfo: this._clientInfo
1196+
}
1197+
},
1198+
InitializeResultSchema,
1199+
options
1200+
);
1201+
if (!this._supportedProtocolVersions.includes(result.protocolVersion)) {
1202+
throw new Error(`Server's protocol version is not supported: ${result.protocolVersion}`);
1203+
}
1204+
this._serverCapabilities = result.capabilities;
1205+
this._serverVersion = result.serverInfo;
1206+
this._negotiatedProtocolVersion = result.protocolVersion;
1207+
this._instructions = result.instructions;
1208+
this._clientTransport?.setProtocolVersion?.(result.protocolVersion);
1209+
if (this._negotiatedProtocolVersion < '2026-06-30') {
1210+
await this.notification({ method: 'notifications/initialized' });
1211+
}
1212+
if (this._pendingListChangedConfig) {
1213+
this._setupListChangedHandlers(this._pendingListChangedConfig);
1214+
this._pendingListChangedConfig = undefined;
1215+
}
1216+
} catch (error) {
1217+
void this.close();
1218+
throw error;
1219+
}
1220+
}
1221+
1222+
override async notification(notification: ClientNotification, options?: NotificationOptions): Promise<void> {
1223+
if (this._clientTransport) {
1224+
this.assertNotificationCapability(notification.method as NotificationMethod);
1225+
return this._clientTransport.notify(notification);
1226+
}
1227+
return super.notification(notification, options);
1228+
}
1229+
1230+
override async close(): Promise<void> {
1231+
if (this._clientTransport) {
1232+
const ct = this._clientTransport;
1233+
this._clientTransport = undefined;
1234+
await ct.close();
1235+
this.onclose?.();
1236+
return;
1237+
}
1238+
return super.close();
1239+
}
10001240
}

0 commit comments

Comments
 (0)