-
-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Expand file tree
/
Copy pathRunStreamPresenter.server.ts
More file actions
203 lines (181 loc) · 7.27 KB
/
RunStreamPresenter.server.ts
File metadata and controls
203 lines (181 loc) · 7.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
import { type PrismaClient, prisma } from "~/db.server";
import { logger } from "~/services/logger.server";
import { requireUserId } from "~/services/session.server";
import { singleton } from "~/utils/singleton";
import { ABORT_REASON_SEND_ERROR, createSSELoader, SendFunction } from "~/utils/sse";
import { throttle } from "~/utils/throttle";
import { getMollifierBuffer } from "~/v3/mollifier/mollifierBuffer.server";
import { deserialiseMollifierSnapshot } from "~/v3/mollifier/mollifierSnapshot.server";
import { tracePubSub } from "~/v3/services/tracePubSub.server";
const PING_INTERVAL = 5_000;
const STREAM_TIMEOUT = 30_000;
export class RunStreamPresenter {
#prismaClient: PrismaClient;
constructor(prismaClient: PrismaClient = prisma) {
this.#prismaClient = prismaClient;
}
public createLoader() {
const prismaClient = this.#prismaClient;
return createSSELoader({
timeout: STREAM_TIMEOUT,
interval: PING_INTERVAL,
handler: async (context) => {
const runFriendlyId = context.params.runParam;
if (!runFriendlyId) {
throw new Response("Missing runParam", { status: 400 });
}
const userId = await requireUserId(context.request);
// Scope the lookup to organizations the requesting user is a member
// of, matching RunPresenter's run lookup. Unauthorized and missing
// runs are indistinguishable (both 404).
const run = await prismaClient.taskRun.findFirst({
where: {
friendlyId: runFriendlyId,
project: {
organization: {
members: {
some: {
userId,
},
},
},
},
},
select: {
traceId: true,
},
});
// Fall back to the mollifier buffer when the run isn't in PG yet.
// The buffered run has no execution events to stream, but we still
// attach a trace-pubsub subscription using the snapshot's traceId
// so that the moment the drainer materialises the row and execution
// begins, those events flow to this open SSE connection. Closing
// with 404 would force the dashboard to keep retrying.
let traceId: string | null = run?.traceId ?? null;
if (!traceId) {
const buffer = getMollifierBuffer();
if (buffer) {
try {
const entry = await buffer.getEntry(runFriendlyId);
// Same membership scoping as the PG lookup above — the buffer
// entry carries the owning org's id.
const isMember = entry
? (await prismaClient.orgMember.findFirst({
where: { organizationId: entry.orgId, userId },
select: { id: true },
})) !== null
: false;
if (entry && isMember) {
// Go through the webapp wrapper so this read-side module
// shares a single deserialisation path with readFallback —
// see the contract comment in syntheticRedirectInfo.server.ts.
const snapshot = deserialiseMollifierSnapshot(entry.payload);
if (typeof snapshot.traceId === "string") {
traceId = snapshot.traceId;
}
}
} catch (err) {
logger.warn("RunStreamPresenter buffer fallback failed", {
runFriendlyId,
err: err instanceof Error ? err.message : String(err),
});
}
}
}
if (!traceId) {
throw new Response("Not found", { status: 404 });
}
const resolvedRun = { traceId };
logger.info("RunStreamPresenter.start", {
runFriendlyId,
traceId: resolvedRun.traceId,
});
// Subscribe to trace updates
const { unsubscribe, eventEmitter } = await tracePubSub.subscribeToTrace(resolvedRun.traceId);
// Only send max every 1 second
const throttledSend = throttle(
(args: { send: SendFunction; event?: string; data: string }) => {
try {
args.send({ event: args.event, data: args.data });
} catch (error) {
if (error instanceof Error) {
if (error.name !== "TypeError") {
logger.debug("Error sending SSE in RunStreamPresenter", {
error: {
name: error.name,
message: error.message,
stack: error.stack,
},
});
}
}
// Abort the stream on send error. Uses a stackless string sentinel
// from sse.ts — a no-arg abort() would create a DOMException with a
// stack trace, which is unnecessary retention on the signal.reason.
context.controller.abort(ABORT_REASON_SEND_ERROR);
}
},
1000
);
let messageListener: ((event: string) => void) | undefined;
return {
initStream: ({ send }) => {
// Create throttled send function
throttledSend({ send, event: "message", data: new Date().toISOString() });
// Set up message listener for pub/sub events
messageListener = (event: string) => {
throttledSend({ send, event: "message", data: event });
};
eventEmitter.addListener("message", messageListener);
context.debug("Subscribed to trace pub/sub");
},
iterator: ({ send }) => {
// Send ping to keep connection alive
try {
// Send an actual message so the client refreshes
throttledSend({ send, event: "message", data: new Date().toISOString() });
} catch (error) {
// If we can't send a ping, the connection is likely dead
return false;
}
},
cleanup: () => {
logger.info("RunStreamPresenter.cleanup", {
runFriendlyId,
traceId: resolvedRun.traceId,
});
// Remove message listener
if (messageListener) {
eventEmitter.removeListener("message", messageListener);
}
eventEmitter.removeAllListeners();
// Unsubscribe from Redis pub/sub
unsubscribe()
.then(() => {
logger.info("RunStreamPresenter.cleanup.unsubscribe succeeded", {
runFriendlyId,
traceId: resolvedRun.traceId,
});
})
.catch((error) => {
logger.error("RunStreamPresenter.cleanup.unsubscribe failed", {
runFriendlyId,
traceId: resolvedRun.traceId,
error: {
name: error.name,
message: error.message,
stack: error.stack,
},
});
});
},
};
},
});
}
}
// Export a singleton loader for the route to use
export const runStreamLoader = singleton("runStreamLoader", () => {
const presenter = new RunStreamPresenter();
return presenter.createLoader();
});