-
Notifications
You must be signed in to change notification settings - Fork 53
Expand file tree
/
Copy pathworker.ts
More file actions
375 lines (350 loc) · 13.7 KB
/
worker.ts
File metadata and controls
375 lines (350 loc) · 13.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
import { Installation } from "@/installation"
import { Server } from "@/server/server"
import { Log } from "@/util/log"
import { Instance } from "@/project/instance"
import { InstanceBootstrap } from "@/project/bootstrap"
import { Rpc } from "@/util/rpc"
import { upgrade } from "@/cli/upgrade"
import { Config } from "@/config/config"
import { GlobalBus } from "@/bus/global"
import { createOpencodeClient, type Event } from "@opencode-ai/sdk/v2"
import type { BunWebSocketData } from "hono/bun"
import { Flag } from "@/flag/flag"
import { setTimeout as sleep } from "node:timers/promises"
// altimate_change start — trace: session tracing in TUI
import { Trace, FileExporter, HttpExporter, type TraceExporter } from "@/altimate/observability/tracing"
// altimate_change end
await Log.init({
print: process.argv.includes("--print-logs"),
dev: Installation.isLocal(),
level: (() => {
if (Installation.isLocal()) return "DEBUG"
return "INFO"
})(),
})
process.on("unhandledRejection", (e) => {
Log.Default.error("rejection", {
e: e instanceof Error ? e.message : e,
})
})
process.on("uncaughtException", (e) => {
Log.Default.error("exception", {
e: e instanceof Error ? e.message : e,
})
// altimate_change start — crash: flush traces on uncaught exception
// After logging, write all active traces to disk so crash context is preserved.
// The process may continue or exit depending on the exception — either way the
// trace snapshot will reflect the crash.
flushAllTracesSync(`Uncaught exception: ${e instanceof Error ? e.message : String(e)}`)
// altimate_change end
})
// Subscribe to global events and forward them via RPC
GlobalBus.on("event", (event) => {
Rpc.emit("global.event", event)
})
let server: Bun.Server<BunWebSocketData> | undefined
const eventStream = {
abort: undefined as AbortController | undefined,
}
// altimate_change start — trace: per-session traces
const sessionTraces = new Map<string, Trace>()
const sessionUserMsgIds = new Map<string, Set<string>>() // Per-session user message IDs (cleaned up on session end)
const MAX_TRACES = 100
// Cached tracing config — loaded once at first use
let tracingConfigLoaded = false
let tracingEnabled = true
let tracingExporters: TraceExporter[] | undefined
let tracingMaxFiles: number | undefined
async function loadTracingConfig() {
if (tracingConfigLoaded) return
tracingConfigLoaded = true
try {
const cfg = await Config.get()
const tc = cfg.tracing
if (tc?.enabled === false) { tracingEnabled = false; return }
const exporters: TraceExporter[] = [new FileExporter(tc?.dir)]
if (tc?.exporters) {
for (const exp of tc.exporters) {
exporters.push(new HttpExporter(exp.name, exp.endpoint, exp.headers))
}
}
tracingExporters = exporters
tracingMaxFiles = tc?.maxFiles
} catch {
// Config failure should not prevent TUI from working
}
}
// altimate_change end
// altimate_change start — trace: get or create per-session trace
function getOrCreateTrace(sessionID: string): Trace | null {
if (!sessionID || !tracingEnabled) return null
if (sessionTraces.has(sessionID)) return sessionTraces.get(sessionID)!
try {
if (sessionTraces.size >= MAX_TRACES) {
const oldest = sessionTraces.keys().next().value
if (oldest) {
Log.Default.warn(`[tracing] Evicting trace for session ${oldest} — ${MAX_TRACES} concurrent sessions reached`)
sessionTraces.get(oldest)?.endTrace().catch(() => {})
sessionTraces.delete(oldest)
sessionUserMsgIds.delete(oldest)
}
}
const trace = tracingExporters
? Trace.withExporters([...tracingExporters], { maxFiles: tracingMaxFiles })
: Trace.create()
trace.startTrace(sessionID, {})
Trace.setActive(trace)
sessionTraces.set(sessionID, trace)
return trace
} catch {
return null
}
}
// altimate_change end
const startEventStream = (input: { directory: string; workspaceID?: string }) => {
if (eventStream.abort) eventStream.abort.abort()
// altimate_change start — crash: flush stale traces before clearing
// Flush any in-flight traces synchronously before clearing — endTrace() is
// async and a crash during the gap would lose trace data.
for (const [, trace] of sessionTraces) {
void trace.endTrace().catch(() => {})
}
sessionTraces.clear()
sessionUserMsgIds.clear()
// altimate_change end
const abort = new AbortController()
eventStream.abort = abort
const signal = abort.signal
const fetchFn = (async (input: RequestInfo | URL, init?: RequestInit) => {
const request = new Request(input, init)
const auth = getAuthorizationHeader()
if (auth) request.headers.set("Authorization", auth)
return Server.Default().fetch(request)
}) as typeof globalThis.fetch
const sdk = createOpencodeClient({
baseUrl: "http://altimate-code.internal",
directory: input.directory,
experimental_workspaceID: input.workspaceID,
fetch: fetchFn,
signal,
})
;(async () => {
// Load tracing config once before processing events
await loadTracingConfig()
while (!signal.aborted) {
const events = await Promise.resolve(
sdk.event.subscribe(
{},
{
signal,
},
),
).catch(() => undefined)
if (!events) {
await sleep(250)
continue
}
for await (const event of events.stream) {
// altimate_change start — trace: feed events to per-session trace
try {
if (event.type === "message.updated") {
const info = (event as any).properties?.info
// Resolve sessionID: use info.sessionID directly, or fall back to
// finding the session via info.parentID (assistant messages may only
// carry the parent message ID, not the session ID).
let resolvedSessionID = info?.sessionID as string | undefined
if (!resolvedSessionID && info?.parentID) {
for (const [sid, msgIds] of sessionUserMsgIds) {
if (msgIds.has(info.parentID)) {
resolvedSessionID = sid
break
}
}
}
if (resolvedSessionID) {
// Create trace eagerly on user message (arrives before part events)
const trace = sessionTraces.get(resolvedSessionID) ?? (info.role === "user" ? getOrCreateTrace(resolvedSessionID) : null)
if (info.role === "user") {
if (info.id) {
if (!sessionUserMsgIds.has(resolvedSessionID)) sessionUserMsgIds.set(resolvedSessionID, new Set())
sessionUserMsgIds.get(resolvedSessionID)!.add(info.id)
}
if (trace) {
const title = (info as any).summary?.title || (info as any).summary?.body
if (title) trace.setTitle(String(title).slice(0, 80), String(title))
}
}
if (info.role === "assistant") {
const r = trace ?? getOrCreateTrace(resolvedSessionID)
r?.enrichFromAssistant({
modelID: info.modelID,
providerID: info.providerID,
agent: info.agent,
variant: info.variant,
})
}
}
}
// altimate_change end
// altimate_change start — trace: part events
if (event.type === "message.part.updated") {
const part = (event as any).properties?.part
if (part) {
// Create trace on first event for this session (lazy creation)
const trace = sessionTraces.get(part.sessionID) ?? getOrCreateTrace(part.sessionID)
if (trace) {
if (part.type === "step-start") trace.logStepStart(part)
if (part.type === "step-finish") trace.logStepFinish(part)
if (part.type === "text" && part.time?.end) {
if (part.messageID && sessionUserMsgIds.get(part.sessionID)?.has(part.messageID)) {
// This is user prompt text — capture as title/prompt
const text = String(part.text || "")
if (text) trace.setTitle(text.slice(0, 80), text)
} else {
// This is assistant response text
trace.logText(part)
}
}
if (part.type === "tool" && (part.state?.status === "completed" || part.state?.status === "error")) {
trace.logToolCall(part)
}
}
}
}
// altimate_change end
// altimate_change start — trace: session title capture and finalization
// Capture session title from session.updated events
if (event.type === "session.updated") {
const info = (event as any).properties?.info
if (info?.id && info?.title) {
const trace = sessionTraces.get(info.id)
if (trace) trace.setTitle(String(info.title))
}
}
// Finalize trace when session reaches idle (completed)
if (event.type === "session.status") {
const sid = (event as any).properties?.sessionID
const status = (event as any).properties?.status?.type
if (status === "idle" && sid) {
const trace = sessionTraces.get(sid)
if (trace) {
// altimate_change start — crash: defer deletion until endTrace() completes
// Keep the trace in sessionTraces during async teardown so
// flushAllTracesSync() can still reach it if a crash occurs
// while endTrace() is in flight.
void trace.endTrace().catch(() => {}).finally(() => {
sessionTraces.delete(sid)
sessionUserMsgIds.delete(sid)
})
// altimate_change end
}
}
}
} catch {
// Trace must never interrupt event forwarding
}
// altimate_change end
Rpc.emit("event", event as Event)
}
if (!signal.aborted) {
await sleep(250)
}
}
})().catch((error) => {
Log.Default.error("event stream error", {
error: error instanceof Error ? error.message : error,
})
})
}
startEventStream({ directory: process.cwd() })
export const rpc = {
async fetch(input: { url: string; method: string; headers: Record<string, string>; body?: string }) {
const headers = { ...input.headers }
const auth = getAuthorizationHeader()
if (auth && !headers["authorization"] && !headers["Authorization"]) {
headers["Authorization"] = auth
}
const request = new Request(input.url, {
method: input.method,
headers,
body: input.body,
})
const response = await Server.Default().fetch(request)
const body = await response.text()
return {
status: response.status,
headers: Object.fromEntries(response.headers.entries()),
body,
}
},
async server(input: { port: number; hostname: string; mdns?: boolean; cors?: string[] }) {
if (server) await server.stop(true)
server = Server.listen(input)
return { url: server.url.toString() }
},
async checkUpgrade(input: { directory: string }) {
await Instance.provide({
directory: input.directory,
init: InstanceBootstrap,
fn: async () => {
await upgrade().catch((err) => {
// Never silently swallow upgrade errors — if this fails, users
// get locked on old versions with no way to self-heal.
console.error("[upgrade] check failed:", String(err))
})
},
})
},
async reload() {
Config.global.reset()
await Instance.disposeAll()
},
async setWorkspace(input: { workspaceID?: string }) {
startEventStream({ directory: process.cwd(), workspaceID: input.workspaceID })
},
async shutdown() {
Log.Default.info("worker shutting down")
if (eventStream.abort) eventStream.abort.abort()
// altimate_change start — trace: flush all active traces on shutdown
for (const [sid, trace] of sessionTraces) {
await trace.endTrace().catch(() => {})
}
sessionTraces.clear()
sessionUserMsgIds.clear()
// altimate_change end
await Instance.disposeAll()
if (server) server.stop(true)
},
}
Rpc.listen(rpc)
// altimate_change start — crash: flush active traces on unexpected exit
// When the worker is terminated (via worker.terminate() from the main thread,
// or on uncaught exceptions), write all in-flight traces to disk synchronously.
//
// NOTE: Bun Workers do NOT receive OS signals (SIGINT, SIGTERM, SIGHUP) —
// those are delivered only to the main thread. Signal-based flush is handled
// in thread.ts by terminating the worker, which triggers the "exit" event here.
let firstFlushReason: string | undefined
function flushAllTracesSync(reason: string) {
// Preserve the most specific reason from the first flush (e.g., the uncaught
// exception message) even if a later handler (exit) calls again with a
// generic reason. Subsequent calls still flush — new traces may have been
// created since the first call.
const effectiveReason = firstFlushReason ?? reason
firstFlushReason ??= reason
for (const [, trace] of sessionTraces) {
try {
trace.flushSync(effectiveReason)
} catch {
// flushSync is best-effort — must never throw in an exit handler
}
}
}
process.once("exit", () => { flushAllTracesSync("Process exited") })
// altimate_change end
function getAuthorizationHeader(): string | undefined {
const password = Flag.OPENCODE_SERVER_PASSWORD
if (!password) return undefined
const username = Flag.OPENCODE_SERVER_USERNAME ?? "opencode"
return `Basic ${btoa(`${username}:${password}`)}`
}