Skip to content

Commit e79ff68

Browse files
fix(mongodb): prevent webhook response stream close race (#33)
1 parent 483d576 commit e79ff68

2 files changed

Lines changed: 91 additions & 12 deletions

File tree

packages/mongodb/src/streamer.ts

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -241,15 +241,33 @@ export async function createStreamer(config: StreamerConfig = {}): Promise<{
241241
// Buffer for chunks that arrive during initial load
242242
const bufferedEventChunks: StreamChunk[] = [];
243243
let isLoadingFromStorage = true;
244+
let closeRequested = false;
245+
let isClosed = false;
244246

245247
// Helper to convert MongoDB Binary to Uint8Array
246248
const toUint8Array = (data: Uint8Array | unknown): Uint8Array => {
247249
if (data instanceof Uint8Array) return data;
248250
return new Uint8Array((data as any).buffer || data);
249251
};
250252

253+
const closeController = () => {
254+
if (isClosed) {
255+
return;
256+
}
257+
isClosed = true;
258+
cleanup();
259+
try {
260+
controller.close();
261+
} catch {
262+
// Ignore if already closed
263+
}
264+
};
265+
251266
// Handler for new chunks (real-time)
252267
const chunkHandler = (chunk: StreamChunk) => {
268+
if (isClosed) {
269+
return;
270+
}
253271
// Skip if already delivered
254272
if (deliveredChunkIds.has(chunk.chunkId)) {
255273
return;
@@ -278,12 +296,13 @@ export async function createStreamer(config: StreamerConfig = {}): Promise<{
278296

279297
// Handler for stream close
280298
const closeHandler = () => {
281-
cleanup();
282-
try {
283-
controller.close();
284-
} catch {
285-
// Ignore if already closed
299+
// If close arrives while we're still loading persisted chunks,
300+
// defer closure until we flush buffered data.
301+
if (isLoadingFromStorage) {
302+
closeRequested = true;
303+
return;
286304
}
305+
closeController();
287306
};
288307

289308
// Cleanup function
@@ -307,8 +326,7 @@ export async function createStreamer(config: StreamerConfig = {}): Promise<{
307326

308327
// Check for EOF
309328
if (chunk.eof) {
310-
cleanup();
311-
controller.close();
329+
closeController();
312330
return;
313331
}
314332

@@ -338,8 +356,7 @@ export async function createStreamer(config: StreamerConfig = {}): Promise<{
338356

339357
for (const chunk of bufferedEventChunks) {
340358
if (chunk.eof) {
341-
cleanup();
342-
controller.close();
359+
closeController();
343360
return;
344361
}
345362
const data = toUint8Array(chunk.data);
@@ -350,9 +367,8 @@ export async function createStreamer(config: StreamerConfig = {}): Promise<{
350367

351368
// Check if we already received EOF while loading
352369
const lastChunk = existingChunks[existingChunks.length - 1];
353-
if (lastChunk?.eof) {
354-
cleanup();
355-
controller.close();
370+
if (closeRequested || lastChunk?.eof) {
371+
closeController();
356372
}
357373
},
358374

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import type { Streamer } from '@workflow/world';
2+
import { beforeAll, describe, expect, test } from 'vitest';
3+
import { createStreamer } from './setup.js';
4+
5+
async function readText(stream: ReadableStream<Uint8Array>): Promise<string> {
6+
const reader = stream.getReader();
7+
const chunks: Uint8Array[] = [];
8+
9+
try {
10+
while (true) {
11+
const { done, value } = await reader.read();
12+
if (done) break;
13+
if (value) chunks.push(value);
14+
}
15+
} finally {
16+
reader.releaseLock();
17+
}
18+
19+
const bytes = new Uint8Array(chunks.flatMap((chunk) => Array.from(chunk)));
20+
return new TextDecoder().decode(bytes);
21+
}
22+
23+
function withTimeout<T>(promise: Promise<T>, timeoutMs: number): Promise<T> {
24+
let timeout: ReturnType<typeof setTimeout> | undefined;
25+
const timeoutPromise = new Promise<never>((_, reject) => {
26+
timeout = setTimeout(() => {
27+
reject(new Error(`Timed out after ${timeoutMs}ms`));
28+
}, timeoutMs);
29+
});
30+
return Promise.race([promise, timeoutPromise]).finally(() => {
31+
if (timeout) {
32+
clearTimeout(timeout);
33+
}
34+
});
35+
}
36+
37+
describe('streamer race conditions', () => {
38+
let streamer: Streamer;
39+
40+
beforeAll(async () => {
41+
({ streamer } = await createStreamer());
42+
});
43+
44+
test('does not drop chunk when close happens immediately after write', async () => {
45+
const expected = 'Hello from webhook!';
46+
const encoder = new TextEncoder();
47+
48+
for (let i = 0; i < 50; i++) {
49+
const id = `${Date.now()}-${i}`;
50+
const streamName = `test-stream-race-${id}`;
51+
const runId = `wrun_test-${id}`;
52+
53+
const readable = await streamer.readFromStream(streamName);
54+
const readPromise = withTimeout(readText(readable), 5000);
55+
56+
await streamer.writeToStream(streamName, runId, encoder.encode(expected));
57+
await streamer.closeStream(streamName, runId);
58+
59+
const result = await readPromise;
60+
expect(result).toBe(expected);
61+
}
62+
});
63+
});

0 commit comments

Comments
 (0)