Skip to content

Commit 5b4250f

Browse files
fix: isolate usage tests and preserve stream usage context
Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
1 parent 3c856b1 commit 5b4250f

5 files changed

Lines changed: 172 additions & 11 deletions

File tree

src/lib/token-usage/index.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,10 +138,16 @@ export function recordTokenUsageEvent(input: TokenUsageEventInput): void {
138138
export function createTokenUsageRecorder(
139139
options: TokenUsageRecorderOptions,
140140
): (usage: UsageTokens) => void {
141+
const store = requestContext.getStore()
142+
const traceId = options.traceId ?? store?.traceId
143+
const sessionId = options.sessionId ?? store?.sessionAffinity
144+
141145
return (usage) => {
142146
recordTokenUsageEvent({
143147
...usage,
144148
...options,
149+
sessionId,
150+
traceId,
145151
})
146152
}
147153
}

src/routes/messages/api-flows.ts

Lines changed: 77 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ export interface FlowBaseOptions {
8585
logger: ConsolaInstance
8686
subagentMarker?: SubagentMarker | null
8787
requestId: string
88+
requestSessionAffinity?: string
89+
requestTraceId?: string
8890
sessionId?: string
8991
compactType?: CompactType
9092
}
@@ -103,14 +105,24 @@ export const handleWithChatCompletions = async (
103105
anthropicPayload: AnthropicMessagesPayload,
104106
options: FlowBaseOptions,
105107
) => {
106-
const { logger, subagentMarker, requestId, sessionId, compactType } = options
108+
const {
109+
logger,
110+
requestSessionAffinity,
111+
requestTraceId,
112+
subagentMarker,
113+
requestId,
114+
sessionId,
115+
compactType,
116+
} = options
107117
const openAIPayload = translateToOpenAI(anthropicPayload)
108118
prepareCopilotChatCompletionsPayload(openAIPayload)
109-
const recordUsage = createCopilotUsageRecorder({
119+
const recordUsage = createMessagesFlowUsageRecorder({
120+
anthropicPayload,
110121
endpoint: "chat_completions",
111122
fallbackSessionId: sessionId,
112123
model: openAIPayload.model,
113-
payload: anthropicPayload,
124+
requestSessionAffinity,
125+
requestTraceId,
114126
})
115127
debugJson(logger, "Translated OpenAI request payload:", openAIPayload)
116128

@@ -130,6 +142,23 @@ export const handleWithChatCompletions = async (
130142
})
131143
}
132144

145+
return handleChatCompletionsStream({
146+
c,
147+
logger,
148+
model: openAIPayload.model,
149+
recordUsage,
150+
response,
151+
})
152+
}
153+
154+
const handleChatCompletionsStream = (options: {
155+
c: Context
156+
logger: ConsolaInstance
157+
model: string
158+
recordUsage: (usage: UsageTokens) => void
159+
response: AsyncIterable<{ data?: string }>
160+
}) => {
161+
const { c, logger, model, recordUsage, response } = options
133162
logger.debug("Streaming response from Copilot")
134163
applyForwardableResponseHeaders(c, getAttachedResponseHeaders(response), {
135164
"content-type": null,
@@ -180,7 +209,7 @@ export const handleWithChatCompletions = async (
180209
const premium = await resolvePremiumInfo(response, "messages/chat-stream")
181210
writeStreamLog(
182211
{
183-
model: openAIPayload.model,
212+
model,
184213
chunks: chunkCount,
185214
done: true,
186215
premium,
@@ -226,7 +255,14 @@ export const handleWithResponsesApi = async (
226255
anthropicPayload: AnthropicMessagesPayload,
227256
options: ResponsesFlowOptions,
228257
) => {
229-
const { logger, selectedModel, compactType, ...requestOptions } = options
258+
const {
259+
logger,
260+
requestSessionAffinity,
261+
requestTraceId,
262+
selectedModel,
263+
compactType,
264+
...requestOptions
265+
} = options
230266

231267
const responsesPayload =
232268
translateAnthropicMessagesToResponsesPayload(anthropicPayload)
@@ -235,6 +271,8 @@ export const handleWithResponsesApi = async (
235271
fallbackSessionId: requestOptions.sessionId,
236272
model: responsesPayload.model,
237273
payload: anthropicPayload,
274+
requestSessionAffinity,
275+
requestTraceId,
238276
})
239277

240278
applyResponsesApiContextManagement(
@@ -304,6 +342,8 @@ export const handleWithMessagesApi = async (
304342
const {
305343
logger,
306344
anthropicBetaHeader,
345+
requestSessionAffinity,
346+
requestTraceId,
307347
subagentMarker,
308348
selectedModel,
309349
requestId,
@@ -317,6 +357,8 @@ export const handleWithMessagesApi = async (
317357
fallbackSessionId: sessionId,
318358
model: anthropicPayload.model,
319359
payload: anthropicPayload,
360+
requestSessionAffinity,
361+
requestTraceId,
320362
})
321363

322364
debugJson(logger, "Translated Messages payload:", anthropicPayload)
@@ -475,6 +517,7 @@ const createNativeStreamBody = (options: {
475517
const { body, model, premium, logger, recordUsage } = options
476518
let chunkCount = 0
477519
let usage: UsageTokens = {}
520+
let usageRecorded = false
478521
let buffer = ""
479522
const decoder = new TextDecoder()
480523
const reader = body.getReader()
@@ -491,7 +534,9 @@ const createNativeStreamBody = (options: {
491534
}
492535

493536
writeStreamLog({ model, chunks: chunkCount, done: true, premium }, true)
494-
recordUsage(usage)
537+
if (!usageRecorded) {
538+
recordUsage(usage)
539+
}
495540

496541
controller.close()
497542
return
@@ -526,6 +571,10 @@ const createNativeStreamBody = (options: {
526571
normalizeAnthropicUsage(parsedEvent.usage),
527572
)
528573
}
574+
if (!usageRecorded && parsedEvent?.type === "message_delta") {
575+
recordUsage(usage)
576+
usageRecorded = true
577+
}
529578
return data !== "" && data !== "[DONE]"
530579
}).length
531580
if (newEvents > 0) {
@@ -625,12 +674,33 @@ const createCopilotUsageRecorder = (options: {
625674
fallbackSessionId?: string
626675
model: string
627676
payload: AnthropicMessagesPayload
677+
requestSessionAffinity?: string
678+
requestTraceId?: string
628679
}): ((usage: UsageTokens) => void) =>
629680
createCopilotTokenUsageRecorder({
630681
endpoint: options.endpoint,
631682
fallbackSessionId: options.fallbackSessionId,
632683
model: options.model,
633-
sessionId: getMetadataSessionId(options.payload),
684+
sessionId:
685+
options.requestSessionAffinity ?? getMetadataSessionId(options.payload),
686+
traceId: options.requestTraceId,
687+
})
688+
689+
const createMessagesFlowUsageRecorder = (options: {
690+
anthropicPayload: AnthropicMessagesPayload
691+
endpoint: TokenUsageEndpoint
692+
fallbackSessionId?: string
693+
model: string
694+
requestSessionAffinity?: string
695+
requestTraceId?: string
696+
}) =>
697+
createCopilotUsageRecorder({
698+
endpoint: options.endpoint,
699+
fallbackSessionId: options.fallbackSessionId,
700+
model: options.model,
701+
payload: options.anthropicPayload,
702+
requestSessionAffinity: options.requestSessionAffinity,
703+
requestTraceId: options.requestTraceId,
634704
})
635705

636706
const getMetadataSessionId = (

src/routes/messages/handler.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,8 @@ export async function handleCompletion(c: Context) {
109109
subagentMarker,
110110
selectedModel,
111111
requestId,
112+
requestSessionAffinity: c.req.header("x-session-affinity"),
113+
requestTraceId: c.req.header("x-trace-id"),
112114
sessionId,
113115
compactType,
114116
logger,
@@ -124,6 +126,8 @@ export async function handleCompletion(c: Context) {
124126
subagentMarker,
125127
selectedModel,
126128
requestId,
129+
requestSessionAffinity: c.req.header("x-session-affinity"),
130+
requestTraceId: c.req.header("x-trace-id"),
127131
sessionId,
128132
compactType,
129133
logger,
@@ -137,6 +141,8 @@ export async function handleCompletion(c: Context) {
137141
{
138142
subagentMarker,
139143
requestId,
144+
requestSessionAffinity: c.req.header("x-session-affinity"),
145+
requestTraceId: c.req.header("x-trace-id"),
140146
sessionId,
141147
compactType,
142148
logger,

tests/native-messages-handler.test.ts

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,18 @@ import * as modelsModule from "~/lib/models"
1111
import * as rateLimitModule from "~/lib/rate-limit"
1212
import { attachResponseHeaders } from "~/lib/response-headers"
1313
import { state } from "~/lib/state"
14+
import { closeUsageStore } from "~/lib/token-usage"
15+
import { traceIdMiddleware } from "~/lib/trace"
1416
import {
1517
getInitiatorFromPayload,
1618
isClaudeModel,
1719
} from "~/routes/messages/api-flows"
1820
import { handleCompletion } from "~/routes/messages/handler"
21+
import { tokenUsageRoute } from "~/routes/token-usage/route"
1922
import * as createMessagesModule from "~/services/copilot/create-messages"
2023

24+
const DB_PATH_ENV = "COPILOT_API_SQLITE_DB_PATH"
25+
2126
const createSseResponse = (events: Array<string>): Response => {
2227
const encoder = new TextEncoder()
2328

@@ -296,6 +301,7 @@ describe("native handler", () => {
296301

297302
test("logs anthropic effort mapping for config fallback", async () => {
298303
const app = new Hono()
304+
app.use("*", traceIdMiddleware)
299305
app.post("/v1/messages", (c) => handleCompletion(c))
300306

301307
const res = await app.request("/v1/messages", {
@@ -449,4 +455,63 @@ describe("native handler", () => {
449455
"rem=74.9&rst=2026-04-27T00%3A00%3A00Z",
450456
)
451457
})
458+
459+
test("records usage from native stream message events", async () => {
460+
process.env[DB_PATH_ENV] = ":memory:"
461+
await closeUsageStore()
462+
createMessagesSpy.mockResolvedValueOnce(
463+
createSseResponse([
464+
'event: message_start\ndata: {"type":"message_start","message":{"id":"msg-usage","type":"message","role":"assistant","model":"claude-opus-4","content":[],"stop_reason":null,"stop_sequence":null,"usage":{"input_tokens":37,"output_tokens":1}}}\n\n',
465+
'event: message_delta\ndata: {"type":"message_delta","delta":{"stop_reason":"max_tokens","stop_sequence":null},"usage":{"input_tokens":37,"output_tokens":96}}\n\n',
466+
'event: message_stop\ndata: {"type":"message_stop"}\n\n',
467+
]),
468+
)
469+
470+
const app = new Hono()
471+
app.post("/v1/messages", (c) => handleCompletion(c))
472+
app.route("/token-usage", tokenUsageRoute)
473+
474+
const res = await app.request("/v1/messages", {
475+
method: "POST",
476+
headers: {
477+
"Content-Type": "application/json",
478+
"x-session-affinity": "native-stream-test",
479+
"x-trace-id": "native-stream-usage-test",
480+
},
481+
body: JSON.stringify({
482+
model: "claude-opus-4",
483+
stream: true,
484+
max_tokens: 1024,
485+
messages: [{ role: "user", content: "hi" }],
486+
} satisfies AnthropicMessagesPayload),
487+
})
488+
489+
expect(res.status).toBe(200)
490+
await res.text()
491+
492+
const eventsResponse = await app.request(
493+
"/token-usage/events?period=day&page=1&page_size=10",
494+
)
495+
const page = (await eventsResponse.json()) as {
496+
items: Array<{
497+
input_tokens: number
498+
output_tokens: number
499+
session_id: string
500+
total_tokens: number
501+
trace_id: string
502+
}>
503+
}
504+
505+
expect(page.items).toHaveLength(1)
506+
expect(page.items[0]).toMatchObject({
507+
input_tokens: 37,
508+
output_tokens: 96,
509+
session_id: "native-stream-test",
510+
total_tokens: 133,
511+
trace_id: "native-stream-usage-test",
512+
})
513+
514+
await closeUsageStore()
515+
Reflect.deleteProperty(process.env, DB_PATH_ENV)
516+
})
452517
})

tests/responses-handler.test.ts

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,13 @@ import * as createChatCompletionsModule from "~/services/copilot/create-chat-com
3232
import * as createMessagesModule from "~/services/copilot/create-messages"
3333
import * as createResponsesModule from "~/services/copilot/create-responses"
3434

35+
const DB_PATH_ENV = "COPILOT_API_SQLITE_DB_PATH"
36+
37+
const isolateTokenUsageStore = async () => {
38+
process.env[DB_PATH_ENV] = ":memory:"
39+
await closeUsageStore()
40+
}
41+
3542
const responseResult: ResponsesResult = {
3643
id: "resp-1",
3744
object: "response",
@@ -338,6 +345,15 @@ const restoreRoutingTestSpies = (
338345
spies.createMessagesSpy?.mockRestore()
339346
}
340347

348+
beforeEach(async () => {
349+
await isolateTokenUsageStore()
350+
})
351+
352+
afterEach(async () => {
353+
await closeUsageStore()
354+
Reflect.deleteProperty(process.env, DB_PATH_ENV)
355+
})
356+
341357
test("normalizes ANSI-colored info logs before assertions", () => {
342358
expect(
343359
normalizeInfoCall(
@@ -1187,7 +1203,6 @@ describe("responses handler token usage", () => {
11871203
rateLimitWait: state.rateLimitWait,
11881204
verbose: state.verbose,
11891205
}
1190-
const dbPathEnv = "COPILOT_API_SQLITE_DB_PATH"
11911206
const createResponsesMock = mock((() =>
11921207
Promise.resolve(
11931208
createStreamResponse([]),
@@ -1198,8 +1213,7 @@ describe("responses handler token usage", () => {
11981213
let getConfigSpy: ReturnType<typeof spyOn<typeof configModule, "getConfig">>
11991214

12001215
beforeEach(async () => {
1201-
process.env[dbPathEnv] = ":memory:"
1202-
await closeUsageStore()
1216+
await isolateTokenUsageStore()
12031217

12041218
state.copilotToken = "test-token"
12051219
state.manualApprove = false
@@ -1226,7 +1240,7 @@ describe("responses handler token usage", () => {
12261240
createResponsesSpy.mockRestore()
12271241
getConfigSpy.mockRestore()
12281242
await closeUsageStore()
1229-
Reflect.deleteProperty(process.env, dbPathEnv)
1243+
Reflect.deleteProperty(process.env, DB_PATH_ENV)
12301244

12311245
state.copilotToken = originalState.copilotToken
12321246
state.manualApprove = originalState.manualApprove

0 commit comments

Comments
 (0)