Skip to content

Commit c739b99

Browse files
authored
Fix step-level getWritable() stream timeout on Vercel (#770) (#1518)
Signed-off-by: ceolinwill <4393133+ceolinwill@users.noreply.github.com>
1 parent e045b59 commit c739b99

3 files changed

Lines changed: 134 additions & 5 deletions

File tree

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@workflow/core": patch
3+
---
4+
5+
Fix `getWritable()` in step functions to resolve on lock release instead of requiring stream close, preventing Vercel function timeouts
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
2+
import { LOCK_POLL_INTERVAL_MS } from '../flushable-stream.js';
3+
4+
vi.mock('../runtime/world.js', () => ({
5+
getWorld: vi.fn(),
6+
}));
7+
8+
describe('step-level getWritable', () => {
9+
beforeEach(async () => {
10+
const mockWorld = {
11+
writeToStream: vi.fn().mockResolvedValue(undefined),
12+
writeToStreamMulti: vi.fn().mockResolvedValue(undefined),
13+
closeStream: vi.fn().mockResolvedValue(undefined),
14+
};
15+
16+
const { getWorld } = await import('../runtime/world.js');
17+
(getWorld as ReturnType<typeof vi.fn>).mockReturnValue(mockWorld);
18+
});
19+
20+
afterEach(() => {
21+
vi.clearAllMocks();
22+
});
23+
24+
it('ops promise should resolve when writer lock is released (without closing stream)', async () => {
25+
const { contextStorage } = await import('./context-storage.js');
26+
27+
const ops: Promise<void>[] = [];
28+
const ctx = {
29+
stepMetadata: {
30+
stepName: 'test-step',
31+
stepId: 'step_001',
32+
stepStartedAt: new Date(),
33+
attempt: 1,
34+
},
35+
workflowMetadata: {
36+
workflowName: 'test-workflow',
37+
workflowRunId: 'wrun_test123',
38+
workflowStartedAt: new Date(),
39+
},
40+
ops,
41+
encryptionKey: undefined,
42+
};
43+
44+
const writable = await contextStorage.run(ctx, async () => {
45+
const { getWritable } = await import('./writable-stream.js');
46+
return getWritable<string>();
47+
});
48+
49+
// Simulate user pattern: write data, then release lock
50+
const writer = writable.getWriter();
51+
await writer.write('hello');
52+
await writer.write('world');
53+
writer.releaseLock();
54+
55+
// Without the fix (.pipeTo()), this hangs because pipeTo only resolves on stream close.
56+
// With flushablePipe + pollWritableLock, it resolves once the lock is released.
57+
await expect(
58+
Promise.race([
59+
Promise.all(ops),
60+
new Promise((_, r) =>
61+
setTimeout(
62+
() => r(new Error('ops did not resolve after releaseLock')),
63+
LOCK_POLL_INTERVAL_MS * 5 + 200
64+
)
65+
),
66+
])
67+
).resolves.not.toThrow();
68+
});
69+
70+
it('ops promise should resolve when stream is explicitly closed', async () => {
71+
const { contextStorage } = await import('./context-storage.js');
72+
73+
const ops: Promise<void>[] = [];
74+
const ctx = {
75+
stepMetadata: {
76+
stepName: 'test-step',
77+
stepId: 'step_001',
78+
stepStartedAt: new Date(),
79+
attempt: 1,
80+
},
81+
workflowMetadata: {
82+
workflowName: 'test-workflow',
83+
workflowRunId: 'wrun_test123',
84+
workflowStartedAt: new Date(),
85+
},
86+
ops,
87+
encryptionKey: undefined,
88+
};
89+
90+
const writable = await contextStorage.run(ctx, async () => {
91+
const { getWritable } = await import('./writable-stream.js');
92+
return getWritable<string>();
93+
});
94+
95+
const writer = writable.getWriter();
96+
await writer.write('data');
97+
await writer.close();
98+
99+
await expect(
100+
Promise.race([
101+
Promise.all(ops),
102+
new Promise((_, r) =>
103+
setTimeout(
104+
() => r(new Error('ops did not resolve after close')),
105+
LOCK_POLL_INTERVAL_MS * 5 + 200
106+
)
107+
),
108+
])
109+
).resolves.not.toThrow();
110+
});
111+
});

packages/core/src/step/writable-stream.ts

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
import {
2+
createFlushableState,
3+
flushablePipe,
4+
pollWritableLock,
5+
} from '../flushable-stream.js';
16
import {
27
getExternalReducers,
38
getSerializeStream,
@@ -47,11 +52,19 @@ export function getWritable<W = any>(
4752
ctx.encryptionKey
4853
);
4954

50-
// Pipe the serialized data to the workflow server stream
51-
// Register this async operation with the runtime's ops array so it's awaited via waitUntil
52-
ctx.ops.push(
53-
serialize.readable.pipeTo(new WorkflowServerWritableStream(name, runId))
54-
);
55+
// Use flushable pipe so the ops promise resolves when the user releases
56+
// their writer lock, not only when the stream is explicitly closed.
57+
// Without this, Vercel functions hang until the runtime timeout because
58+
// .pipeTo() only resolves on stream close.
59+
const serverWritable = new WorkflowServerWritableStream(name, runId);
60+
const state = createFlushableState();
61+
ctx.ops.push(state.promise);
62+
63+
flushablePipe(serialize.readable, serverWritable, state).catch(() => {
64+
// Errors are handled via state.reject
65+
});
66+
67+
pollWritableLock(serialize.writable, state);
5568

5669
// Return the writable side of the transform stream
5770
return serialize.writable;

0 commit comments

Comments
 (0)