Skip to content

Commit 0e43ecc

Browse files
fix(tracing): key runs by user message id
1 parent 876b0e2 commit 0e43ecc

8 files changed

Lines changed: 181 additions & 77 deletions

File tree

src/handlers/message.ts

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ export function handleMessageUpdated(e: EventMessageUpdated, ctx: HandlerContext
6161
const msg = e.properties.info
6262
if (msg.role !== "assistant") return
6363
const assistant = msg as AssistantMessage
64+
setBoundedMap(ctx.assistantRuns, assistant.id, assistant.parentID)
6465
if (!assistant.time.completed) return
6566

6667
const { sessionID, modelID, providerID } = assistant
@@ -287,7 +288,9 @@ export function handleMessagePartUpdated(e: EventMessagePartUpdated, ctx: Handle
287288
...ctx.commonAttrs,
288289
},
289290
},
290-
resolveSessionTraceContext(toolPart.sessionID, ctx),
291+
resolveSessionTraceContext(toolPart.sessionID, ctx, {
292+
assistantMessageID: toolPart.messageID,
293+
}),
291294
)
292295
})()
293296
: undefined
@@ -339,7 +342,9 @@ export function handleMessagePartUpdated(e: EventMessagePartUpdated, ctx: Handle
339342
...ctx.commonAttrs,
340343
},
341344
},
342-
resolveSessionTraceContext(toolPart.sessionID, ctx),
345+
resolveSessionTraceContext(toolPart.sessionID, ctx, {
346+
assistantMessageID: toolPart.messageID,
347+
}),
343348
)
344349
})()
345350
toolSpan.setAttributes({ [AGENT_NAME]: agentName, "agent.type": agentType })
@@ -411,6 +416,7 @@ export function handleMessagePartUpdated(e: EventMessagePartUpdated, ctx: Handle
411416
export function startMessageSpan(
412417
sessionID: string,
413418
messageID: string,
419+
parentID: string,
414420
modelID: string,
415421
providerID: string,
416422
startTime: number,
@@ -419,7 +425,9 @@ export function startMessageSpan(
419425
if (!isTraceEnabled("llm", ctx)) return
420426
const msgKey = `${sessionID}:${messageID}`
421427
if (ctx.messageSpans.has(msgKey)) return
428+
setBoundedMap(ctx.assistantRuns, messageID, parentID)
422429
const { agentName, agentType } = getSessionAgentMeta(sessionID, ctx)
430+
const inputText = ctx.runInputs.get(parentID)
423431

424432
const msgSpan = ctx.tracer.startSpan(
425433
`${ctx.tracePrefix}llm`,
@@ -434,17 +442,17 @@ export function startMessageSpan(
434442
[LLM_SYSTEM]: providerID,
435443
[LLM_PROVIDER]: providerID,
436444
[LLM_MODEL_NAME]: modelID,
437-
...(ctx.sessionInputs.has(sessionID)
445+
...(inputText
438446
? {
439-
[INPUT_VALUE]: ctx.sessionInputs.get(sessionID)!,
447+
[INPUT_VALUE]: inputText,
440448
[INPUT_MIME_TYPE]: MimeType.TEXT,
441-
[LLM_INPUT_MESSAGES]: JSON.stringify([{ role: "user", content: ctx.sessionInputs.get(sessionID)! }]),
449+
[LLM_INPUT_MESSAGES]: JSON.stringify([{ role: "user", content: inputText }]),
442450
}
443451
: {}),
444452
...ctx.commonAttrs,
445453
},
446454
},
447-
resolveSessionTraceContext(sessionID, ctx),
455+
resolveSessionTraceContext(sessionID, ctx, { runID: parentID, assistantMessageID: messageID }),
448456
)
449457
setBoundedMap(ctx.messageSpans, msgKey, msgSpan)
450458
}

src/handlers/session.ts

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,21 @@ import type { HandlerContext, SessionAgentType } from "../types.ts"
2424

2525
const OPENINFERENCE_SPAN_KIND = SemanticConventions.OPENINFERENCE_SPAN_KIND
2626

27+
/** Starts or refreshes the root run span for a single user turn, keyed by the user message ID. */
2728
export function handleRunStarted(
29+
runID: string,
2830
sessionID: string,
2931
agent: string,
3032
promptText: string,
3133
model: string,
3234
startTime: number,
3335
ctx: HandlerContext,
3436
) {
37+
ctx.activeRuns.set(sessionID, runID)
38+
ctx.pendingRuns.delete(sessionID)
39+
if (promptText) setBoundedMap(ctx.runInputs, runID, promptText)
3540
if (!isTraceEnabled("session", ctx)) return
36-
const existing = ctx.runSpans.get(sessionID)
41+
const existing = ctx.runSpans.get(runID)
3742
if (existing) {
3843
existing.setAttributes({
3944
[AGENT_NAME]: agent,
@@ -72,9 +77,8 @@ export function handleRunStarted(
7277
},
7378
ctx.rootContext(),
7479
)
75-
setBoundedMap(ctx.runSpans, sessionID, runSpan)
76-
setBoundedMap(ctx.runSpanContexts, sessionID, runSpan.spanContext())
77-
setBoundedMap(ctx.sessionRunRoots, sessionID, sessionID)
80+
ctx.runSpans.set(runID, runSpan)
81+
setBoundedMap(ctx.runSpanContexts, runID, runSpan.spanContext())
7882
}
7983

8084
/** Increments the session counter, records start time, starts the root session span, and emits a `session.created` log event. */
@@ -89,8 +93,6 @@ export function handleSessionCreated(e: EventSessionCreated, ctx: HandlerContext
8993
setBoundedMap(ctx.sessionTotals, sessionID, { startMs: createdAt, tokens: 0, cost: 0, messages: 0, agent: "unknown", agentType })
9094

9195
if (isTraceEnabled("session", ctx) && parentID) {
92-
const runRootID = ctx.sessionRunRoots.get(parentID) ?? parentID
93-
setBoundedMap(ctx.sessionRunRoots, sessionID, runRootID)
9496
const sessionSpan = ctx.tracer.startSpan(
9597
`${ctx.tracePrefix}session`,
9698
{
@@ -106,7 +108,7 @@ export function handleSessionCreated(e: EventSessionCreated, ctx: HandlerContext
106108
},
107109
resolveSessionTraceContext(parentID, ctx),
108110
)
109-
setBoundedMap(ctx.sessionSpans, sessionID, sessionSpan)
111+
ctx.sessionSpans.set(sessionID, sessionSpan)
110112
setBoundedMap(ctx.sessionSpanContexts, sessionID, sessionSpan.spanContext())
111113
}
112114

@@ -138,7 +140,7 @@ function sweepSession(sessionID: string, ctx: HandlerContext) {
138140
ctx.pendingToolSpans.delete(key)
139141
}
140142
}
141-
ctx.sessionInputs.delete(sessionID)
143+
ctx.pendingRuns.delete(sessionID)
142144
const msgPrefix = `${sessionID}:`
143145
for (const [key, span] of ctx.messageSpans) {
144146
if (key.startsWith(msgPrefix)) {
@@ -192,7 +194,9 @@ export function handleSessionIdle(e: EventSessionIdle, ctx: HandlerContext) {
192194
sessionSpan.end()
193195
ctx.sessionSpans.delete(sessionID)
194196
}
195-
const runSpan = ctx.runSpans.get(sessionID)
197+
const runID = ctx.activeRuns.get(sessionID)
198+
if (runID) ctx.activeRuns.delete(sessionID)
199+
const runSpan = runID ? ctx.runSpans.get(runID) : undefined
196200
if (runSpan) {
197201
if (totals) {
198202
runSpan.setAttributes({
@@ -205,7 +209,7 @@ export function handleSessionIdle(e: EventSessionIdle, ctx: HandlerContext) {
205209
}
206210
runSpan.setStatus({ code: SpanStatusCode.OK })
207211
runSpan.end()
208-
ctx.runSpans.delete(sessionID)
212+
ctx.runSpans.delete(runID!)
209213
}
210214

211215
ctx.emitLog({
@@ -252,13 +256,15 @@ export function handleSessionError(e: EventSessionError, ctx: HandlerContext) {
252256
sessionSpan.end()
253257
ctx.sessionSpans.delete(rawID)
254258
}
255-
const runSpan = ctx.runSpans.get(rawID)
259+
const runID = ctx.activeRuns.get(rawID)
260+
if (runID) ctx.activeRuns.delete(rawID)
261+
const runSpan = runID ? ctx.runSpans.get(runID) : undefined
256262
if (runSpan) {
257263
if (totals) runSpan.setAttributes({ [AGENT_NAME]: totals.agent, "agent.type": totals.agentType })
258264
runSpan.setStatus({ code: SpanStatusCode.ERROR, message: error })
259265
runSpan.setAttribute("error", error)
260266
runSpan.end()
261-
ctx.runSpans.delete(rawID)
267+
ctx.runSpans.delete(runID!)
262268
}
263269
}
264270

src/index.ts

Lines changed: 43 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -106,11 +106,13 @@ export const OtelPlugin: Plugin = async ({ project, client, directory, worktree
106106
const sessionDiffTotals = new Map()
107107
const runSpans = new Map()
108108
const runSpanContexts = new Map()
109-
const sessionRunRoots = new Map()
109+
const activeRuns = new Map()
110+
const assistantRuns = new Map()
111+
const pendingRuns = new Map()
112+
const runInputs = new Map()
110113
const sessionSpans = new Map()
111114
const sessionSpanContexts = new Map()
112115
const messageSpans = new Map()
113-
const sessionInputs = new Map()
114116
const messageOutputs = new Map()
115117
const { disabledMetrics, disabledTraces } = config
116118
const commonAttrs = {
@@ -146,11 +148,13 @@ export const OtelPlugin: Plugin = async ({ project, client, directory, worktree
146148
rootContext,
147149
runSpans,
148150
runSpanContexts,
149-
sessionRunRoots,
151+
activeRuns,
152+
assistantRuns,
153+
pendingRuns,
154+
runInputs,
150155
sessionSpans,
151156
sessionSpanContexts,
152157
messageSpans,
153-
sessionInputs,
154158
messageOutputs,
155159
}
156160

@@ -220,16 +224,26 @@ export const OtelPlugin: Plugin = async ({ project, client, directory, worktree
220224
return ""
221225
}
222226
}).filter(Boolean).join("\n")
223-
sessionInputs.set(input.sessionID, promptText)
224227
if (!sessionSpan) {
225-
handleRunStarted(
226-
input.sessionID,
227-
agent,
228-
promptText,
229-
input.model ? `${input.model.providerID}/${input.model.modelID}` : "unknown",
230-
startTime,
231-
ctx,
232-
)
228+
const model = input.model ? `${input.model.providerID}/${input.model.modelID}` : "unknown"
229+
if (input.messageID) {
230+
handleRunStarted(
231+
input.messageID,
232+
input.sessionID,
233+
agent,
234+
promptText,
235+
model,
236+
startTime,
237+
ctx,
238+
)
239+
} else {
240+
setBoundedMap(pendingRuns, input.sessionID, {
241+
agent,
242+
promptText,
243+
model,
244+
startTime,
245+
})
246+
}
233247
}
234248
const promptLength = promptText.length
235249
emitLog({
@@ -280,10 +294,26 @@ export const OtelPlugin: Plugin = async ({ project, client, directory, worktree
280294
case "message.updated": {
281295
const msgEvt = event as EventMessageUpdated
282296
const info = msgEvt.properties.info
297+
if (info.role === "user") {
298+
const pendingRun = pendingRuns.get(info.sessionID)
299+
if (!sessionSpans.has(info.sessionID) && (pendingRun || activeRuns.get(info.sessionID) !== info.id)) {
300+
handleRunStarted(
301+
info.id,
302+
info.sessionID,
303+
pendingRun?.agent ?? info.agent,
304+
pendingRun?.promptText ?? "",
305+
pendingRun?.model ?? `${info.model.providerID}/${info.model.modelID}`,
306+
pendingRun?.startTime ?? info.time.created,
307+
ctx,
308+
)
309+
}
310+
break
311+
}
283312
if (info.role === "assistant" && !info.time?.completed) {
284313
startMessageSpan(
285314
info.sessionID,
286315
info.id,
316+
info.parentID,
287317
info.modelID ?? "unknown",
288318
info.providerID ?? "unknown",
289319
info.time?.created ?? Date.now(),

src/types.ts

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,14 @@ export type SessionTotals = {
6767
agentType: SessionAgentType
6868
}
6969

70+
/** Pending root-run metadata captured from `chat.message` until the user message ID is known. */
71+
export type PendingRun = {
72+
agent: string
73+
promptText: string
74+
model: string
75+
startTime: number
76+
}
77+
7078
/** Shared context threaded through every event handler. */
7179
export type HandlerContext = {
7280
log: PluginLogger
@@ -84,10 +92,12 @@ export type HandlerContext = {
8492
rootContext: () => Context
8593
runSpans: Map<string, Span>
8694
runSpanContexts: Map<string, SpanContext>
87-
sessionRunRoots: Map<string, string>
95+
activeRuns: Map<string, string>
96+
assistantRuns: Map<string, string>
97+
pendingRuns: Map<string, PendingRun>
98+
runInputs: Map<string, string>
8899
sessionSpans: Map<string, Span>
89100
sessionSpanContexts: Map<string, SpanContext>
90101
messageSpans: Map<string, Span>
91-
sessionInputs: Map<string, string>
92102
messageOutputs: Map<string, string>
93103
}

src/util.ts

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,33 @@ export function setBoundedMap<K, V>(map: Map<K, V>, key: K, value: V) {
2323
map.set(key, value)
2424
}
2525

26-
export function resolveSessionTraceContext(sessionID: string, ctx: HandlerContext) {
26+
/** Resolves a root-run context from the live span first, then from the retained ended span context. */
27+
export function resolveRunTraceContext(runID: string, ctx: Pick<HandlerContext, "rootContext" | "runSpans" | "runSpanContexts">) {
28+
const baseCtx = ctx.rootContext()
29+
const runSpan = ctx.runSpans.get(runID)
30+
if (runSpan) return trace.setSpan(baseCtx, runSpan)
31+
const runSpanContext = ctx.runSpanContexts.get(runID)
32+
return runSpanContext ? trace.setSpanContext(baseCtx, runSpanContext) : baseCtx
33+
}
34+
35+
/** Resolves the best available trace parent for a session event or message/tool child span. */
36+
export function resolveSessionTraceContext(
37+
sessionID: string,
38+
ctx: HandlerContext,
39+
input?: { assistantMessageID?: string; runID?: string },
40+
) {
2741
const baseCtx = ctx.rootContext()
2842
const sessionSpan = ctx.sessionSpans.get(sessionID)
2943
if (sessionSpan) return trace.setSpan(baseCtx, sessionSpan)
3044
const sessionSpanContext = ctx.sessionSpanContexts.get(sessionID)
3145
if (sessionSpanContext) return trace.setSpanContext(baseCtx, sessionSpanContext)
32-
const runRootID = ctx.sessionRunRoots.get(sessionID) ?? sessionID
33-
const runSpan = ctx.runSpans.get(runRootID)
34-
if (runSpan) return trace.setSpan(baseCtx, runSpan)
35-
const runSpanContext = ctx.runSpanContexts.get(runRootID)
36-
return runSpanContext ? trace.setSpanContext(baseCtx, runSpanContext) : baseCtx
46+
if (input?.runID) return resolveRunTraceContext(input.runID, ctx)
47+
const assistantRunID = input?.assistantMessageID
48+
? ctx.assistantRuns.get(input.assistantMessageID)
49+
: undefined
50+
if (assistantRunID) return resolveRunTraceContext(assistantRunID, ctx)
51+
const activeRunID = ctx.activeRuns.get(sessionID)
52+
return activeRunID ? resolveRunTraceContext(activeRunID, ctx) : baseCtx
3753
}
3854

3955
/**

tests/handlers/disabled-signals.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ describe("disabled traces", () => {
106106
test("disabling all three trace types suppresses every span", async () => {
107107
const { ctx, tracer } = makeCtx("proj_test", [], ["session", "llm", "tool"])
108108
handleSessionCreated(makeSessionCreated("ses_1"), ctx)
109-
startMessageSpan("ses_1", "msg_1", "claude-3-5-sonnet", "anthropic", 1000, ctx)
109+
startMessageSpan("ses_1", "msg_1", "user_1", "claude-3-5-sonnet", "anthropic", 1000, ctx)
110110
await handleMessagePartUpdated(makeToolPartUpdated("running"), ctx)
111111
expect(tracer.spans).toHaveLength(0)
112112
})

0 commit comments

Comments
 (0)