Skip to content

Commit e384abb

Browse files
committed
fix: harden restored session manager sse flows
1 parent 7202770 commit e384abb

2 files changed

Lines changed: 104 additions & 1 deletion

File tree

src/tempo/client/SessionManager.test.ts

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,13 @@ function makeSseResponse(events: string[]): Response {
5353
})
5454
}
5555

56+
function makeProblemResponse(status: number, body: Record<string, unknown>): Response {
57+
return new Response(JSON.stringify(body), {
58+
status,
59+
headers: { 'Content-Type': 'application/problem+json' },
60+
})
61+
}
62+
5663
describe('Session', () => {
5764
describe('parseEvent round-trip via SSE', () => {
5865
test('parses message events from SSE stream', () => {
@@ -530,6 +537,84 @@ describe('Session', () => {
530537
})
531538

532539
describe('.sse() event parsing', () => {
540+
test('preserves headers instances while adding SSE accept header', async () => {
541+
const mockFetch = vi.fn().mockResolvedValue(makeSseResponse([]))
542+
543+
const s = sessionManager({
544+
account: '0x0000000000000000000000000000000000000001',
545+
fetch: mockFetch as typeof globalThis.fetch,
546+
})
547+
548+
const body = new TextEncoder().encode('{"stream":true}').buffer
549+
await s.sse('https://api.example.com/stream', {
550+
method: 'POST',
551+
headers: new Headers({ 'Content-Type': 'application/json' }),
552+
body,
553+
})
554+
555+
const requestInit = mockFetch.mock.calls[0]?.[1]
556+
const headers = new Headers(requestInit?.headers)
557+
558+
expect(headers.get('Accept')).toBe('text/event-stream')
559+
expect(headers.get('Content-Type')).toBe('application/json')
560+
expect(requestInit?.body).toBe(body)
561+
})
562+
563+
test('rejects restored SSE when paid response remains a 402 problem response', async () => {
564+
vi.resetModules()
565+
566+
const createCredential = vi.fn().mockResolvedValue('voucher')
567+
const helperCreateCredential = vi.fn().mockResolvedValue('restore:5000000')
568+
569+
vi.doMock('./Session.js', () => ({
570+
session: vi.fn(() => ({
571+
createCredential,
572+
})),
573+
UnrecoverableRestoreError,
574+
}))
575+
576+
vi.doMock('../../client/internal/Fetch.js', () => ({
577+
from: ({
578+
fetch,
579+
onChallenge,
580+
}: {
581+
fetch: typeof globalThis.fetch
582+
onChallenge: Function
583+
}) => {
584+
return async (input: RequestInfo | URL, init?: RequestInit) => {
585+
await onChallenge(makeChallenge(), { createCredential: helperCreateCredential })
586+
return fetch(input, init)
587+
}
588+
},
589+
}))
590+
591+
try {
592+
const { sessionManager: isolatedSessionManager } = await import('./SessionManager.js')
593+
const mockFetch = vi.fn<typeof globalThis.fetch>().mockResolvedValue(
594+
makeProblemResponse(402, {
595+
type: 'https://example.com/problems/payment-required',
596+
title: 'Payment Required',
597+
detail: 'Session restore voucher was rejected',
598+
}),
599+
)
600+
601+
const s = isolatedSessionManager({
602+
account: '0x0000000000000000000000000000000000000001',
603+
fetch: mockFetch,
604+
restore: {
605+
channelId,
606+
cumulativeAmount: 5_000_000n,
607+
},
608+
})
609+
610+
await expect(s.sse('https://api.example.com/stream')).rejects.toThrow(/status 402/i)
611+
} finally {
612+
vi.doUnmock('./Session.js')
613+
vi.doUnmock('../../client/internal/Fetch.js')
614+
vi.resetModules()
615+
}
616+
})
617+
533618
test('restored sse resumes same channel when required cumulative exceeds current', async () => {
534619
vi.resetModules()
535620

src/tempo/client/SessionManager.ts

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,14 @@ export function sessionManager(parameters: sessionManager.Parameters): SessionMa
157157
})
158158
}
159159

160+
async function throwForBadSseResponse(response: PaymentResponse): Promise<never> {
161+
const contentType = response.headers.get('Content-Type') ?? ''
162+
const body = await response.text().catch(() => '')
163+
throw new Error(
164+
`SSE request failed with status ${response.status}${contentType ? ` (${contentType})` : ''}${body ? `: ${body}` : ''}`,
165+
)
166+
}
167+
160168
async function doFetch(input: RequestInfo | URL, init?: RequestInit): Promise<PaymentResponse> {
161169
lastUrl = input
162170
const response = await wrappedFetch(input, init)
@@ -209,7 +217,6 @@ export function sessionManager(parameters: sessionManager.Parameters): SessionMa
209217

210218
async sse(input, init) {
211219
const { onReceipt, signal, ...fetchInit } = init ?? {}
212-
213220
const sseInit = {
214221
...fetchInit,
215222
headers: {
@@ -221,6 +228,10 @@ export function sessionManager(parameters: sessionManager.Parameters): SessionMa
221228

222229
const response = await doFetch(input, sseInit)
223230

231+
if (!response.ok) {
232+
await throwForBadSseResponse(response)
233+
}
234+
224235
// Snapshot the challenge at SSE open time so concurrent
225236
// calls don't overwrite it.
226237
const sseChallenge = lastChallenge
@@ -324,6 +335,13 @@ export function sessionManager(parameters: sessionManager.Parameters): SessionMa
324335
if (receiptHeader) receipt = deserializeSessionReceipt(receiptHeader)
325336
}
326337

338+
if (channel && activeChannelId === channel.channelId) {
339+
channel = {
340+
...channel,
341+
opened: false,
342+
}
343+
}
344+
327345
if (!channel && restored && activeChannelId === restored.channelId) {
328346
restored = null
329347
}

0 commit comments

Comments
 (0)