Skip to content

Commit 71a9260

Browse files
refactor: replace _authRetryInFlight class field with isAuthRetry parameter
Stops the 10-comment whack-a-mole around flag lifecycle. A mutable boolean class field is the wrong primitive for 'retry once per operation' when operations are concurrent and recursive — every reset point creates a race, every missed reset creates a stuck flag. Now all four 401 paths use parameter-passed isAuthRetry: - StreamableHTTP _startOrAuthSse(options, isAuthRetry = false): recursion passes true. No class field, no reset sites. - StreamableHTTP send() delegates to private _send(message, options, isAuthRetry). Recursion passes true. No class field. - SSE _startOrAuth(isAuthRetry = false): onerror callback captures isAuthRetry from closure; retry calls _startOrAuth(true). - SSE send() delegates to private _send(message, isAuthRetry). Per-operation state dies with the stack frame. Concurrent operations cannot observe each other's retry state. 12 reset sites deleted. Also makes SSE onerror fallback consistent with other paths — throws SdkError(ClientHttpAuthentication) for the circuit-breaker case instead of plain UnauthorizedError. Not addressed (noted for auth() cleanup): concurrent 401s still each call onUnauthorized() independently. Deduplicating that (in-flight promise pattern) would be a behavior change.
1 parent b31d8d9 commit 71a9260

File tree

2 files changed

+31
-34
lines changed

2 files changed

+31
-34
lines changed

packages/client/src/client/sse.ts

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,6 @@ export class SSEClientTransport implements Transport {
9898
this._fetchWithInit = createFetchWithInit(opts?.fetch, opts?.requestInit);
9999
}
100100

101-
private _authRetryInFlight = false;
102101
private _last401Response?: Response;
103102

104103
private async _commonHeaders(): Promise<Headers> {
@@ -119,7 +118,7 @@ export class SSEClientTransport implements Transport {
119118
});
120119
}
121120

122-
private _startOrAuth(): Promise<void> {
121+
private _startOrAuth(isAuthRetry = false): Promise<void> {
123122
const fetchImpl = (this?._eventSourceInit?.fetch ?? this._fetch ?? fetch) as typeof fetch;
124123
return new Promise((resolve, reject) => {
125124
this._eventSource = new EventSource(this._url.href, {
@@ -148,22 +147,22 @@ export class SSEClientTransport implements Transport {
148147

149148
this._eventSource.onerror = event => {
150149
if (event.code === 401 && this._authProvider) {
151-
if (this._authProvider.onUnauthorized && this._last401Response && !this._authRetryInFlight) {
152-
this._authRetryInFlight = true;
150+
if (this._authProvider.onUnauthorized && this._last401Response && !isAuthRetry) {
153151
const response = this._last401Response;
154152
this._authProvider
155153
.onUnauthorized({ response, serverUrl: this._url, fetchFn: this._fetchWithInit })
156-
.then(() => this._startOrAuth())
154+
.then(() => this._startOrAuth(true))
157155
.then(resolve, error => {
158156
this.onerror?.(error);
159157
reject(error);
160-
})
161-
.finally(() => {
162-
this._authRetryInFlight = false;
163158
});
164159
return;
165160
}
166-
const error = new UnauthorizedError();
161+
const error = isAuthRetry
162+
? new SdkError(SdkErrorCode.ClientHttpAuthentication, 'Server returned 401 after re-authentication', {
163+
status: 401
164+
})
165+
: new UnauthorizedError();
167166
reject(error);
168167
this.onerror?.(error);
169168
return;
@@ -247,6 +246,10 @@ export class SSEClientTransport implements Transport {
247246
}
248247

249248
async send(message: JSONRPCMessage): Promise<void> {
249+
return this._send(message, false);
250+
}
251+
252+
private async _send(message: JSONRPCMessage, isAuthRetry: boolean): Promise<void> {
250253
if (!this._endpoint) {
251254
throw new SdkError(SdkErrorCode.NotConnected, 'Not connected');
252255
}
@@ -271,19 +274,18 @@ export class SSEClientTransport implements Transport {
271274
this._scope = scope;
272275
}
273276

274-
if (this._authProvider.onUnauthorized && !this._authRetryInFlight) {
275-
this._authRetryInFlight = true;
277+
if (this._authProvider.onUnauthorized && !isAuthRetry) {
276278
await this._authProvider.onUnauthorized({
277279
response,
278280
serverUrl: this._url,
279281
fetchFn: this._fetchWithInit
280282
});
281283
await response.text?.().catch(() => {});
282284
// Purposely _not_ awaited, so we don't call onerror twice
283-
return this.send(message);
285+
return this._send(message, true);
284286
}
285287
await response.text?.().catch(() => {});
286-
if (this._authRetryInFlight) {
288+
if (isAuthRetry) {
287289
throw new SdkError(SdkErrorCode.ClientHttpAuthentication, 'Server returned 401 after re-authentication', {
288290
status: 401
289291
});
@@ -295,12 +297,9 @@ export class SSEClientTransport implements Transport {
295297
throw new Error(`Error POSTing to endpoint (HTTP ${response.status}): ${text}`);
296298
}
297299

298-
this._authRetryInFlight = false;
299-
300300
// Release connection - POST responses don't have content we need
301301
await response.text?.().catch(() => {});
302302
} catch (error) {
303-
this._authRetryInFlight = false;
304303
this.onerror?.(error as Error);
305304
throw error;
306305
}

packages/client/src/client/streamableHttp.ts

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,6 @@ export class StreamableHTTPClientTransport implements Transport {
148148
private _sessionId?: string;
149149
private _reconnectionOptions: StreamableHTTPReconnectionOptions;
150150
private _protocolVersion?: string;
151-
private _authRetryInFlight = false; // Circuit breaker for send() 401 retry
152-
private _sseAuthRetryInFlight = false; // Circuit breaker for _startOrAuthSse() 401 retry — separate so concurrent GET/POST 401s don't interfere
153151
private _lastUpscopingHeader?: string; // Track last upscoping header to prevent infinite upscoping.
154152
private _serverRetryMs?: number; // Server-provided retry delay from SSE retry field
155153
private _reconnectionTimeout?: ReturnType<typeof setTimeout>;
@@ -198,7 +196,7 @@ export class StreamableHTTPClientTransport implements Transport {
198196
});
199197
}
200198

201-
private async _startOrAuthSse(options: StartSSEOptions): Promise<void> {
199+
private async _startOrAuthSse(options: StartSSEOptions, isAuthRetry = false): Promise<void> {
202200
const { resumptionToken } = options;
203201

204202
try {
@@ -227,19 +225,18 @@ export class StreamableHTTPClientTransport implements Transport {
227225
this._scope = scope;
228226
}
229227

230-
if (this._authProvider.onUnauthorized && !this._sseAuthRetryInFlight) {
231-
this._sseAuthRetryInFlight = true;
228+
if (this._authProvider.onUnauthorized && !isAuthRetry) {
232229
await this._authProvider.onUnauthorized({
233230
response,
234231
serverUrl: this._url,
235232
fetchFn: this._fetchWithInit
236233
});
237234
await response.text?.().catch(() => {});
238235
// Purposely _not_ awaited, so we don't call onerror twice
239-
return this._startOrAuthSse(options);
236+
return this._startOrAuthSse(options, true);
240237
}
241238
await response.text?.().catch(() => {});
242-
if (this._sseAuthRetryInFlight) {
239+
if (isAuthRetry) {
243240
throw new SdkError(SdkErrorCode.ClientHttpAuthentication, 'Server returned 401 after re-authentication', {
244241
status: 401
245242
});
@@ -252,7 +249,6 @@ export class StreamableHTTPClientTransport implements Transport {
252249
// 405 indicates that the server does not offer an SSE stream at GET endpoint
253250
// This is an expected case that should not trigger an error
254251
if (response.status === 405) {
255-
this._sseAuthRetryInFlight = false;
256252
return;
257253
}
258254

@@ -262,10 +258,8 @@ export class StreamableHTTPClientTransport implements Transport {
262258
});
263259
}
264260

265-
this._sseAuthRetryInFlight = false;
266261
this._handleSseStream(response.body, options, true);
267262
} catch (error) {
268-
this._sseAuthRetryInFlight = false;
269263
this.onerror?.(error as Error);
270264
throw error;
271265
}
@@ -475,6 +469,14 @@ export class StreamableHTTPClientTransport implements Transport {
475469
async send(
476470
message: JSONRPCMessage | JSONRPCMessage[],
477471
options?: { resumptionToken?: string; onresumptiontoken?: (token: string) => void }
472+
): Promise<void> {
473+
return this._send(message, options, false);
474+
}
475+
476+
private async _send(
477+
message: JSONRPCMessage | JSONRPCMessage[],
478+
options: { resumptionToken?: string; onresumptiontoken?: (token: string) => void } | undefined,
479+
isAuthRetry: boolean
478480
): Promise<void> {
479481
try {
480482
const { resumptionToken, onresumptiontoken } = options || {};
@@ -516,19 +518,18 @@ export class StreamableHTTPClientTransport implements Transport {
516518
this._scope = scope;
517519
}
518520

519-
if (this._authProvider.onUnauthorized && !this._authRetryInFlight) {
520-
this._authRetryInFlight = true;
521+
if (this._authProvider.onUnauthorized && !isAuthRetry) {
521522
await this._authProvider.onUnauthorized({
522523
response,
523524
serverUrl: this._url,
524525
fetchFn: this._fetchWithInit
525526
});
526527
await response.text?.().catch(() => {});
527528
// Purposely _not_ awaited, so we don't call onerror twice
528-
return this.send(message, options);
529+
return this._send(message, options, true);
529530
}
530531
await response.text?.().catch(() => {});
531-
if (this._authRetryInFlight) {
532+
if (isAuthRetry) {
532533
throw new SdkError(SdkErrorCode.ClientHttpAuthentication, 'Server returned 401 after re-authentication', {
533534
status: 401
534535
});
@@ -573,7 +574,7 @@ export class StreamableHTTPClientTransport implements Transport {
573574
throw new UnauthorizedError();
574575
}
575576

576-
return this.send(message, options);
577+
return this._send(message, options, isAuthRetry);
577578
}
578579
}
579580

@@ -583,8 +584,6 @@ export class StreamableHTTPClientTransport implements Transport {
583584
});
584585
}
585586

586-
// Reset auth loop flag on successful response
587-
this._authRetryInFlight = false;
588587
this._lastUpscopingHeader = undefined;
589588

590589
// If the response is 202 Accepted, there's no body to process
@@ -634,7 +633,6 @@ export class StreamableHTTPClientTransport implements Transport {
634633
await response.text?.().catch(() => {});
635634
}
636635
} catch (error) {
637-
this._authRetryInFlight = false;
638636
this.onerror?.(error as Error);
639637
throw error;
640638
}

0 commit comments

Comments
 (0)