Skip to content

Commit 3737caa

Browse files
[backport] [world-vercel] Use stream control frame for transparent reconnection (#1766)
1 parent 41dd333 commit 3737caa

3 files changed

Lines changed: 348 additions & 14 deletions

File tree

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@workflow/world-vercel': patch
3+
---
4+
5+
Use stream control frame to transparently reconnect on server timeout

packages/world-vercel/src/streamer.test.ts

Lines changed: 201 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
import { afterEach, describe, expect, it, vi } from 'vitest';
2-
import { encodeMultiChunks, MAX_CHUNKS_PER_REQUEST } from './streamer.js';
2+
import {
3+
encodeMultiChunks,
4+
MAX_CHUNKS_PER_REQUEST,
5+
parseStreamControlFrame,
6+
STREAM_CONTROL_FRAME_SIZE,
7+
} from './streamer.js';
38

49
describe('encodeMultiChunks', () => {
510
/**
@@ -269,3 +274,198 @@ describe('writeToStreamMulti pagination', () => {
269274
]);
270275
});
271276
});
277+
278+
/**
279+
* Build a control frame matching the workflow-server format.
280+
*/
281+
function buildControlFrame(done: boolean, nextIndex: number): Uint8Array {
282+
const frame = new Uint8Array(STREAM_CONTROL_FRAME_SIZE);
283+
// Bytes 0-3: zero-frame marker (already 0x00)
284+
frame[4] = done ? 1 : 0;
285+
new DataView(frame.buffer).setUint32(5, nextIndex, false);
286+
// Magic footer "WFCT"
287+
frame.set(new Uint8Array([0x57, 0x46, 0x43, 0x54]), 9);
288+
return frame;
289+
}
290+
291+
describe('parseStreamControlFrame', () => {
292+
it('parses a valid done=true control frame', () => {
293+
const frame = buildControlFrame(true, 42);
294+
const result = parseStreamControlFrame(frame);
295+
expect(result).toEqual({
296+
done: true,
297+
nextIndex: 42,
298+
totalLength: STREAM_CONTROL_FRAME_SIZE,
299+
});
300+
});
301+
302+
it('parses a valid done=false (timeout) control frame', () => {
303+
const frame = buildControlFrame(false, 100);
304+
const result = parseStreamControlFrame(frame);
305+
expect(result).toEqual({
306+
done: false,
307+
nextIndex: 100,
308+
totalLength: STREAM_CONTROL_FRAME_SIZE,
309+
});
310+
});
311+
312+
it('parses control frame appended after data bytes', () => {
313+
const data = new Uint8Array([1, 2, 3, 4, 5]);
314+
const frame = buildControlFrame(false, 7);
315+
const combined = new Uint8Array(data.length + frame.length);
316+
combined.set(data, 0);
317+
combined.set(frame, data.length);
318+
319+
const result = parseStreamControlFrame(combined);
320+
expect(result).toEqual({
321+
done: false,
322+
nextIndex: 7,
323+
totalLength: STREAM_CONTROL_FRAME_SIZE,
324+
});
325+
});
326+
327+
it('returns null for buffer shorter than control frame size', () => {
328+
expect(parseStreamControlFrame(new Uint8Array(12))).toBeNull();
329+
expect(parseStreamControlFrame(new Uint8Array(0))).toBeNull();
330+
});
331+
332+
it('returns null when magic footer does not match', () => {
333+
const frame = buildControlFrame(true, 0);
334+
frame[12] = 0xff; // corrupt magic footer
335+
expect(parseStreamControlFrame(frame)).toBeNull();
336+
});
337+
338+
it('returns null when zero-frame marker is not all zeros', () => {
339+
const frame = buildControlFrame(true, 0);
340+
frame[0] = 1; // corrupt zero-frame marker
341+
expect(parseStreamControlFrame(frame)).toBeNull();
342+
});
343+
344+
it('handles nextIndex=0', () => {
345+
const frame = buildControlFrame(false, 0);
346+
const result = parseStreamControlFrame(frame);
347+
expect(result).toEqual({
348+
done: false,
349+
nextIndex: 0,
350+
totalLength: STREAM_CONTROL_FRAME_SIZE,
351+
});
352+
});
353+
354+
it('handles large nextIndex values', () => {
355+
const frame = buildControlFrame(true, 0xffffffff);
356+
const result = parseStreamControlFrame(frame);
357+
expect(result?.nextIndex).toBe(0xffffffff);
358+
});
359+
});
360+
361+
describe('readFromStream reconnection', () => {
362+
/** Collect every byte from a ReadableStream into one Uint8Array. */
363+
async function drain(
364+
stream: ReadableStream<Uint8Array>
365+
): Promise<Uint8Array> {
366+
const reader = stream.getReader();
367+
const parts: Uint8Array[] = [];
368+
for (;;) {
369+
const { done, value } = await reader.read();
370+
if (done) break;
371+
parts.push(value);
372+
}
373+
const len = parts.reduce((s, p) => s + p.length, 0);
374+
const out = new Uint8Array(len);
375+
let off = 0;
376+
for (const p of parts) {
377+
out.set(p, off);
378+
off += p.length;
379+
}
380+
return out;
381+
}
382+
383+
function chunkedStream(chunks: Uint8Array[]): ReadableStream<Uint8Array> {
384+
let i = 0;
385+
return new ReadableStream({
386+
pull(controller) {
387+
if (i < chunks.length) {
388+
controller.enqueue(chunks[i++]);
389+
} else {
390+
controller.close();
391+
}
392+
},
393+
});
394+
}
395+
396+
function streamResponse(...chunks: Uint8Array[]): Response {
397+
return new Response(chunkedStream(chunks), {
398+
status: 200,
399+
headers: { 'Content-Type': 'application/octet-stream' },
400+
});
401+
}
402+
403+
async function getStreamer() {
404+
const { createStreamer } = await import('./streamer.js');
405+
return createStreamer();
406+
}
407+
408+
afterEach(() => {
409+
vi.restoreAllMocks();
410+
});
411+
412+
it('reconnects when server sends done=false and resumes from nextIndex', async () => {
413+
const chunk1 = new TextEncoder().encode('aaa');
414+
const chunk2 = new TextEncoder().encode('bbb');
415+
const timeout = buildControlFrame(false, 3);
416+
const done = buildControlFrame(true, 6);
417+
418+
const fetchSpy = vi
419+
.spyOn(globalThis, 'fetch')
420+
.mockResolvedValueOnce(streamResponse(chunk1, timeout))
421+
.mockResolvedValueOnce(streamResponse(chunk2, done));
422+
423+
const streamer = await getStreamer();
424+
const result = await drain(await streamer.readFromStream('strm_test'));
425+
426+
const expected = new Uint8Array([...chunk1, ...chunk2]);
427+
expect(result).toEqual(expected);
428+
expect(fetchSpy).toHaveBeenCalledTimes(2);
429+
430+
const secondUrl = new URL(fetchSpy.mock.calls[1][0] as string);
431+
expect(secondUrl.searchParams.get('startIndex')).toBe('3');
432+
});
433+
434+
it('falls through when no control frame is present (backward compat)', async () => {
435+
const data = new TextEncoder().encode('legacy server');
436+
437+
vi.spyOn(globalThis, 'fetch').mockResolvedValueOnce(streamResponse(data));
438+
439+
const streamer = await getStreamer();
440+
const result = await drain(await streamer.readFromStream('strm_test'));
441+
442+
expect(result).toEqual(data);
443+
});
444+
445+
it('propagates network error to consumer without retrying', async () => {
446+
const data = new TextEncoder().encode('partial');
447+
448+
let callCount = 0;
449+
const errorStream = new ReadableStream<Uint8Array>({
450+
pull(controller) {
451+
if (callCount === 0) {
452+
callCount++;
453+
controller.enqueue(data);
454+
} else {
455+
controller.error(new Error('connection reset'));
456+
}
457+
},
458+
});
459+
460+
vi.spyOn(globalThis, 'fetch').mockResolvedValueOnce(
461+
new Response(errorStream, { status: 200 })
462+
);
463+
464+
const streamer = await getStreamer();
465+
// readFromStream reads the full response via arrayBuffer(), so
466+
// a mid-stream error rejects the readFromStream promise itself.
467+
await expect(streamer.readFromStream('strm_test')).rejects.toThrow(
468+
'connection reset'
469+
);
470+
});
471+
});

packages/world-vercel/src/streamer.ts

Lines changed: 142 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,66 @@ export const MAX_CHUNKS_PER_REQUEST = 1000;
2323
// (partial writes, long-lived reads), and duplex streams are incompatible
2424
// with undici's experimental H2 support.
2525

26+
/**
27+
* Stream control frame constants, mirroring workflow-server's format.
28+
*
29+
* Control frame (13 bytes):
30+
* [0-3] Zero-frame marker (0x00 0x00 0x00 0x00)
31+
* [4] Flags — bit 0: done (1 = complete, 0 = timeout/reconnect)
32+
* [5-8] nextIndex — big-endian uint32, chunk index to resume from
33+
* [9-12] Magic footer — "WFCT" (0x57 0x46 0x43 0x54)
34+
*/
35+
export const STREAM_CONTROL_FRAME_SIZE = 13;
36+
const STREAM_CONTROL_MAGIC = new Uint8Array([0x57, 0x46, 0x43, 0x54]);
37+
38+
export interface StreamControlFrame {
39+
done: boolean;
40+
nextIndex: number;
41+
}
42+
43+
/**
44+
* Try to parse a stream control frame from the tail of a buffer.
45+
* Returns the parsed frame and the byte length of the control data,
46+
* or null if no valid control frame is present.
47+
*/
48+
export function parseStreamControlFrame(
49+
buffer: Uint8Array
50+
): (StreamControlFrame & { totalLength: number }) | null {
51+
if (buffer.length < STREAM_CONTROL_FRAME_SIZE) return null;
52+
53+
const offset = buffer.length - STREAM_CONTROL_FRAME_SIZE;
54+
55+
// Check zero-frame marker (bytes 0-3 must be 0x00)
56+
if (
57+
buffer[offset] !== 0 ||
58+
buffer[offset + 1] !== 0 ||
59+
buffer[offset + 2] !== 0 ||
60+
buffer[offset + 3] !== 0
61+
) {
62+
return null;
63+
}
64+
65+
// Check magic footer at bytes 9-12
66+
if (
67+
buffer[offset + 9] !== STREAM_CONTROL_MAGIC[0] ||
68+
buffer[offset + 10] !== STREAM_CONTROL_MAGIC[1] ||
69+
buffer[offset + 11] !== STREAM_CONTROL_MAGIC[2] ||
70+
buffer[offset + 12] !== STREAM_CONTROL_MAGIC[3]
71+
) {
72+
return null;
73+
}
74+
75+
const flags = buffer[offset + 4];
76+
const view = new DataView(buffer.buffer, buffer.byteOffset + offset + 5, 4);
77+
const nextIndex = view.getUint32(0, false);
78+
79+
return {
80+
done: (flags & 1) === 1,
81+
nextIndex,
82+
totalLength: STREAM_CONTROL_FRAME_SIZE,
83+
};
84+
}
85+
2686
function getStreamUrl(
2787
name: string,
2888
runId: string | undefined,
@@ -188,21 +248,90 @@ export function createStreamer(config?: APIConfig): Streamer {
188248
},
189249

190250
async readFromStream(name: string, startIndex?: number) {
191-
const httpConfig = await getHttpConfig(config);
192-
const url = getStreamUrl(name, undefined, httpConfig);
193-
if (typeof startIndex === 'number') {
194-
url.searchParams.set('startIndex', String(startIndex));
251+
let currentStartIndex = startIndex ?? 0;
252+
253+
// Cap reconnections to prevent infinite loops if the server
254+
// never completes the stream. 50 reconnects at 2-min server
255+
// timeout ≈ 100 minutes of streaming.
256+
const MAX_RECONNECTS = 50;
257+
let reconnectCount = 0;
258+
259+
const connect = async (): Promise<Response> => {
260+
const httpConfig = await getHttpConfig(config);
261+
// Use v3 to receive the stream control frame for reconnection.
262+
const url = new URL(
263+
`${httpConfig.baseUrl}/v3/stream/${encodeURIComponent(name)}`
264+
);
265+
url.searchParams.set('startIndex', String(currentStartIndex));
266+
const response = await fetch(url, {
267+
headers: httpConfig.headers,
268+
});
269+
if (!response.ok) {
270+
throw new Error(`Failed to fetch stream: ${response.status}`);
271+
}
272+
if (!response.body) {
273+
throw new Error('No response body for stream');
274+
}
275+
return response;
276+
};
277+
278+
// Read the entire response as bytes, then strip any trailing
279+
// control frame. This avoids the complexity of hold-back buffering
280+
// inside a ReadableStream pull loop, which interacts poorly with
281+
// the proxy and byte-stream wrappers in production.
282+
const readFull = async (): Promise<{
283+
data: Uint8Array;
284+
control: (StreamControlFrame & { totalLength: number }) | null;
285+
}> => {
286+
const response = await connect();
287+
const buffer = new Uint8Array(await response.arrayBuffer());
288+
const control = parseStreamControlFrame(buffer);
289+
if (control) {
290+
const dataLen = buffer.length - control.totalLength;
291+
return {
292+
data: dataLen > 0 ? buffer.subarray(0, dataLen) : new Uint8Array(0),
293+
control,
294+
};
295+
}
296+
return { data: buffer, control: null };
297+
};
298+
299+
// Collect all data, transparently reconnecting on server timeout.
300+
const parts: Uint8Array[] = [];
301+
for (;;) {
302+
const { data, control } = await readFull();
303+
if (data.length > 0) {
304+
parts.push(data);
305+
}
306+
307+
if (!control || control.done) {
308+
// Stream complete or no control frame (older server).
309+
break;
310+
}
311+
312+
// Timeout — reconnect from the next chunk index.
313+
reconnectCount++;
314+
if (reconnectCount > MAX_RECONNECTS) {
315+
throw new Error(
316+
`Stream exceeded maximum reconnection attempts (${MAX_RECONNECTS})`
317+
);
318+
}
319+
currentStartIndex = control.nextIndex;
195320
}
196-
const response = await fetch(url, {
197-
headers: httpConfig.headers,
321+
322+
// Return a ReadableStream that emits the collected data.
323+
let emitted = false;
324+
return new ReadableStream<Uint8Array>({
325+
pull(controller) {
326+
if (!emitted) {
327+
emitted = true;
328+
for (const part of parts) {
329+
controller.enqueue(part);
330+
}
331+
}
332+
controller.close();
333+
},
198334
});
199-
if (!response.ok) {
200-
throw new Error(`Failed to fetch stream: ${response.status}`);
201-
}
202-
if (!response.body) {
203-
throw new Error('No response body for stream');
204-
}
205-
return response.body as ReadableStream<Uint8Array>;
206335
},
207336

208337
async getStreamChunks(

0 commit comments

Comments
 (0)