Skip to content

Commit 67def3c

Browse files
fix redis/turso webhook e2e (#29)
1 parent c47ec4c commit 67def3c

4 files changed

Lines changed: 28 additions & 11 deletions

File tree

.changeset/tame-bags-work.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
---
2+
"@workflow-worlds/redis": patch
3+
"@workflow-worlds/turso": patch
4+
---
5+
6+
Fix webhook e2e regressions where webhook endpoints could return 404 in Redis and Turso worlds.
7+
8+
- Redis: avoid closing stream readers before final persisted chunks are drained.
9+
- Turso: avoid closing stream readers during initial replay before buffered chunks are delivered.
10+
- Turso: normalize hook metadata nulls to undefined to preserve expected hydration behavior.

packages/redis/src/streamer.ts

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ export async function createStreamer(options: {
161161
eof: boolean;
162162
}> = [];
163163
let isClosed = false;
164+
let closeSignalReceived = false;
164165

165166
// Handler for pub/sub messages
166167
const messageHandler = (channel: string, message: string) => {
@@ -170,15 +171,9 @@ export async function createStreamer(options: {
170171
const parsed = JSON.parse(message);
171172

172173
if (parsed.type === 'close') {
173-
if (!isClosed) {
174-
isClosed = true;
175-
cleanup();
176-
try {
177-
controller.close();
178-
} catch {
179-
// Ignore if already closed
180-
}
181-
}
174+
// A close signal can race ahead of chunk polling. Delay closure
175+
// until we have drained persisted entries (including final chunk).
176+
closeSignalReceived = true;
182177
}
183178
// For chunk notifications, we'll poll for new data
184179
} catch {
@@ -319,7 +314,7 @@ export async function createStreamer(options: {
319314
if (!newEntries || newEntries.length === 0) {
320315
// Check if stream is now closed
321316
const nowClosed = await redis.get(closedKey);
322-
if (nowClosed === '1' && !isClosed) {
317+
if ((closeSignalReceived || nowClosed === '1') && !isClosed) {
323318
isClosed = true;
324319
clearInterval(pollInterval);
325320
cleanup();

packages/turso/src/storage.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ function toHook(row: HookRow, specVersion?: number): Hook {
209209
hookId: row.hookId,
210210
runId: row.runId,
211211
token: row.token,
212-
metadata: row.metadata,
212+
metadata: row.metadata ?? undefined,
213213
ownerId: row.ownerId,
214214
projectId: row.projectId,
215215
environment: row.environment,

packages/turso/src/streamer.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ export function createStreamer(config: StreamerConfig): Streamer {
174174
// Buffer for chunks that arrive during initial load
175175
const bufferedEventChunks: StreamChunk[] = [];
176176
let isLoadingFromStorage = true;
177+
let closeRequested = false;
177178

178179
// Handler for new chunks (real-time)
179180
const chunkHandler = (chunk: StreamChunk) => {
@@ -202,6 +203,12 @@ export function createStreamer(config: StreamerConfig): Streamer {
202203

203204
// Handler for stream close
204205
const closeHandler = () => {
206+
if (isLoadingFromStorage) {
207+
// Don't close immediately during initial load. A close event can
208+
// race ahead of buffered chunk delivery.
209+
closeRequested = true;
210+
return;
211+
}
205212
cleanup?.();
206213
try {
207214
controller.close();
@@ -274,6 +281,11 @@ export function createStreamer(config: StreamerConfig): Streamer {
274281
}
275282
}
276283

284+
if (closeRequested) {
285+
cleanup?.();
286+
controller.close();
287+
}
288+
277289
} catch (error) {
278290
cleanup?.();
279291
controller.error(error);

0 commit comments

Comments
 (0)