|
| 1 | +import { logger } from "./logger"; |
| 2 | + |
| 3 | +const baseUrl = process.env.INNGEST_BASE_URL ?? ""; |
| 4 | +const signingKey = process.env.INNGEST_SIGNING_KEY ?? ""; |
| 5 | + |
| 6 | +const DEFAULT_MAX_EVENTS = 500; |
| 7 | +const MAX_EVENTS = DEFAULT_MAX_EVENTS; |
| 8 | + |
| 9 | +/** Event shape from GET /v1/events (https://api.inngest.com/v1/events) */ |
| 10 | +type InngestEventRow = { |
| 11 | + internal_id?: string; |
| 12 | + accountID?: string; |
| 13 | + environmentID?: string; |
| 14 | + source?: string; |
| 15 | + sourceID?: string | null; |
| 16 | + /** RFC3339 timestamp – API uses receivedAt, dev server may use received_at */ |
| 17 | + receivedAt?: string; |
| 18 | + received_at?: string; |
| 19 | + id: string; |
| 20 | + name: string; |
| 21 | + data: Record<string, unknown>; |
| 22 | + user?: unknown; |
| 23 | + ts: number; |
| 24 | + v?: string | null; |
| 25 | + metadata?: { |
| 26 | + fetchedAt: string; |
| 27 | + cachedUntil: string | null; |
| 28 | + }; |
| 29 | +}; |
| 30 | + |
| 31 | +/** Run shape from GET /v1/events/{eventId}/runs – the actual job execution */ |
| 32 | +type InngestRun = { |
| 33 | + run_id: string; |
| 34 | + event_id: string; |
| 35 | + status: string; // "Running" | "Completed" | "Failed" | "Cancelled" | "Queued"? |
| 36 | + run_started_at?: string; |
| 37 | + ended_at?: string | null; |
| 38 | + output?: unknown; |
| 39 | + // dev server / API may use different casing |
| 40 | + run_started_at_ms?: number; |
| 41 | +}; |
| 42 | + |
| 43 | +function getEventReceivedAt(ev: InngestEventRow): string | undefined { |
| 44 | + return ev.receivedAt ?? ev.received_at; |
| 45 | +} |
| 46 | + |
| 47 | +/** Map Inngest run status to BullMQ-style state for the UI */ |
| 48 | +function runStatusToState( |
| 49 | + status: string, |
| 50 | +): "pending" | "active" | "completed" | "failed" | "cancelled" { |
| 51 | + const s = status.toLowerCase(); |
| 52 | + if (s === "running") return "active"; |
| 53 | + if (s === "completed") return "completed"; |
| 54 | + if (s === "failed") return "failed"; |
| 55 | + if (s === "cancelled") return "cancelled"; |
| 56 | + if (s === "queued") return "pending"; |
| 57 | + return "pending"; |
| 58 | +} |
| 59 | + |
| 60 | +export const fetchInngestEvents = async () => { |
| 61 | + const maxEvents = MAX_EVENTS; |
| 62 | + const all: InngestEventRow[] = []; |
| 63 | + let cursor: string | undefined; |
| 64 | + |
| 65 | + do { |
| 66 | + const params = new URLSearchParams({ limit: "100" }); |
| 67 | + if (cursor) { |
| 68 | + params.set("cursor", cursor); |
| 69 | + } |
| 70 | + |
| 71 | + const res = await fetch(`${baseUrl}/v1/events?${params}`, { |
| 72 | + headers: { |
| 73 | + Authorization: `Bearer ${signingKey}`, |
| 74 | + "Content-Type": "application/json", |
| 75 | + }, |
| 76 | + }); |
| 77 | + |
| 78 | + if (!res.ok) { |
| 79 | + logger.warn("Inngest API error", { |
| 80 | + status: res.status, |
| 81 | + body: await res.text(), |
| 82 | + }); |
| 83 | + break; |
| 84 | + } |
| 85 | + |
| 86 | + const body = (await res.json()) as { |
| 87 | + data?: InngestEventRow[]; |
| 88 | + cursor?: string; |
| 89 | + nextCursor?: string; |
| 90 | + }; |
| 91 | + const data = Array.isArray(body.data) ? body.data : []; |
| 92 | + all.push(...data); |
| 93 | + |
| 94 | + // Next page: API may return cursor/nextCursor, or use last event's internal_id (per API docs) |
| 95 | + const nextCursor = |
| 96 | + body.cursor ?? body.nextCursor ?? data[data.length - 1]?.internal_id; |
| 97 | + const hasMore = data.length === 100 && nextCursor && all.length < maxEvents; |
| 98 | + cursor = hasMore ? nextCursor : undefined; |
| 99 | + } while (cursor); |
| 100 | + |
| 101 | + return all.slice(0, maxEvents); |
| 102 | +}; |
| 103 | + |
| 104 | +/** Fetch runs for a single event (GET /v1/events/{eventId}/runs) – runs are the actual jobs */ |
| 105 | +export const fetchInngestRunsForEvent = async ( |
| 106 | + eventId: string, |
| 107 | +): Promise<InngestRun[]> => { |
| 108 | + const res = await fetch( |
| 109 | + `${baseUrl}/v1/events/${encodeURIComponent(eventId)}/runs`, |
| 110 | + { |
| 111 | + headers: { |
| 112 | + Authorization: `Bearer ${signingKey}`, |
| 113 | + "Content-Type": "application/json", |
| 114 | + }, |
| 115 | + }, |
| 116 | + ); |
| 117 | + if (!res.ok) { |
| 118 | + logger.warn("Inngest runs API error", { |
| 119 | + eventId, |
| 120 | + status: res.status, |
| 121 | + body: await res.text(), |
| 122 | + }); |
| 123 | + return []; |
| 124 | + } |
| 125 | + const body = (await res.json()) as { data?: InngestRun[] }; |
| 126 | + return Array.isArray(body.data) ? body.data : []; |
| 127 | +}; |
| 128 | + |
| 129 | +/** One row for the queue UI (BullMQ-compatible shape) */ |
| 130 | +export type DeploymentJobRow = { |
| 131 | + id: string; |
| 132 | + name: string; |
| 133 | + data: Record<string, unknown>; |
| 134 | + timestamp: number; |
| 135 | + processedOn?: number; |
| 136 | + finishedOn?: number; |
| 137 | + failedReason?: string; |
| 138 | + state: string; |
| 139 | +}; |
| 140 | + |
| 141 | +/** Build queue rows from events + their runs (one row per run, or pending if no run yet) */ |
| 142 | +function buildDeploymentRowsFromRuns( |
| 143 | + events: InngestEventRow[], |
| 144 | + runsByEventId: Map<string, InngestRun[]>, |
| 145 | + serverId: string, |
| 146 | +): DeploymentJobRow[] { |
| 147 | + const requested = events.filter( |
| 148 | + (e) => |
| 149 | + e.name === "deployment/requested" && |
| 150 | + (e.data as Record<string, unknown>)?.serverId === serverId, |
| 151 | + ); |
| 152 | + const rows: DeploymentJobRow[] = []; |
| 153 | + |
| 154 | + for (const ev of requested) { |
| 155 | + const data = (ev.data ?? {}) as Record<string, unknown>; |
| 156 | + const runs = runsByEventId.get(ev.id) ?? []; |
| 157 | + |
| 158 | + if (runs.length === 0) { |
| 159 | + // Queued: event received but no run yet |
| 160 | + rows.push({ |
| 161 | + id: ev.id, |
| 162 | + name: ev.name, |
| 163 | + data, |
| 164 | + timestamp: ev.ts, |
| 165 | + processedOn: ev.ts, |
| 166 | + finishedOn: undefined, |
| 167 | + failedReason: undefined, |
| 168 | + state: "pending", |
| 169 | + }); |
| 170 | + continue; |
| 171 | + } |
| 172 | + |
| 173 | + for (const run of runs) { |
| 174 | + const state = runStatusToState(run.status); |
| 175 | + const runStartedMs = |
| 176 | + run.run_started_at_ms ?? |
| 177 | + (run.run_started_at ? new Date(run.run_started_at).getTime() : ev.ts); |
| 178 | + const endedMs = run.ended_at |
| 179 | + ? new Date(run.ended_at).getTime() |
| 180 | + : undefined; |
| 181 | + const failedReason = |
| 182 | + state === "failed" && |
| 183 | + run.output && |
| 184 | + typeof run.output === "object" && |
| 185 | + "error" in run.output |
| 186 | + ? String((run.output as { error?: unknown }).error) |
| 187 | + : undefined; |
| 188 | + |
| 189 | + rows.push({ |
| 190 | + id: run.run_id, |
| 191 | + name: ev.name, |
| 192 | + data, |
| 193 | + timestamp: runStartedMs, |
| 194 | + processedOn: runStartedMs, |
| 195 | + finishedOn: |
| 196 | + state === "completed" || state === "failed" || state === "cancelled" |
| 197 | + ? endedMs |
| 198 | + : undefined, |
| 199 | + failedReason, |
| 200 | + state, |
| 201 | + }); |
| 202 | + } |
| 203 | + } |
| 204 | + |
| 205 | + return rows.sort((a, b) => (b.timestamp ?? 0) - (a.timestamp ?? 0)); |
| 206 | +} |
| 207 | + |
| 208 | +/** Fetch deployment jobs for a server: events → runs → rows (correct model: runs = jobs) */ |
| 209 | +export const fetchDeploymentJobs = async ( |
| 210 | + serverId: string, |
| 211 | +): Promise<DeploymentJobRow[]> => { |
| 212 | + if (!signingKey) { |
| 213 | + logger.warn("INNGEST_SIGNING_KEY not set, returning empty jobs list"); |
| 214 | + return []; |
| 215 | + } |
| 216 | + if (!baseUrl) { |
| 217 | + throw new Error("INNGEST_BASE_URL is required to list deployment jobs"); |
| 218 | + } |
| 219 | + |
| 220 | + const events = await fetchInngestEvents(); |
| 221 | + |
| 222 | + const requestedForServer = events.filter( |
| 223 | + (e) => |
| 224 | + e.name === "deployment/requested" && |
| 225 | + (e.data as Record<string, unknown>)?.serverId === serverId, |
| 226 | + ); |
| 227 | + // Limit to avoid too many run fetches |
| 228 | + const toFetch = requestedForServer.slice(0, 50); |
| 229 | + const runsByEventId = new Map<string, InngestRun[]>(); |
| 230 | + |
| 231 | + await Promise.all( |
| 232 | + toFetch.map(async (ev) => { |
| 233 | + const runs = await fetchInngestRunsForEvent(ev.id); |
| 234 | + runsByEventId.set(ev.id, runs); |
| 235 | + }), |
| 236 | + ); |
| 237 | + |
| 238 | + return buildDeploymentRowsFromRuns(toFetch, runsByEventId, serverId); |
| 239 | +}; |
0 commit comments