Skip to content

Commit 73ea1e2

Browse files
authored
fix: add fix for negative token values in metrics and add logs to tro… (#3695)
fix: add fix for negative token values in metrics and add logs to troubleshoot workers bottleneck
1 parent fdfa055 commit 73ea1e2

4 files changed

Lines changed: 78 additions & 3 deletions

File tree

services/session-ingest/src/dos/session-metrics.test.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,31 @@ describe('computeSessionMetrics', () => {
148148
expect(result.totalCost).toBeCloseTo(0.15);
149149
});
150150

151+
it('clamps negative token and cost totals to 0', () => {
152+
const items = [
153+
makeItem('message', {
154+
role: 'assistant',
155+
time: { created: 1000 },
156+
tokens: {
157+
input: -10,
158+
output: -20,
159+
reasoning: -30,
160+
cache: { read: -40, write: -50 },
161+
},
162+
cost: -0.05,
163+
}),
164+
];
165+
const result = computeSessionMetrics(items, 'completed');
166+
expect(result.totalTokens).toEqual({
167+
input: 0,
168+
output: 0,
169+
reasoning: 0,
170+
cacheRead: 0,
171+
cacheWrite: 0,
172+
});
173+
expect(result.totalCost).toBe(0);
174+
});
175+
151176
it('counts compaction parts', () => {
152177
const items = [
153178
makeItem('part', { type: 'compaction', auto: true }),

services/session-ingest/src/dos/session-metrics.ts

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,10 @@ function freshAccumulator(): Accumulator {
147147
};
148148
}
149149

150+
function nonNegative(value: number): number {
151+
return Math.max(0, value);
152+
}
153+
150154
function processKiloMeta(acc: Accumulator, raw: unknown) {
151155
const parsed = KiloMetaSchema.safeParse(raw);
152156
if (!parsed.success) return;
@@ -303,8 +307,14 @@ export function computeSessionMetrics(
303307
totalErrors: acc.totalErrors,
304308
errorsByType: acc.errorsByType,
305309
stuckToolCallCount,
306-
totalTokens: acc.totalTokens,
307-
totalCost: acc.totalCost,
310+
totalTokens: {
311+
input: nonNegative(acc.totalTokens.input),
312+
output: nonNegative(acc.totalTokens.output),
313+
reasoning: nonNegative(acc.totalTokens.reasoning),
314+
cacheRead: nonNegative(acc.totalTokens.cacheRead),
315+
cacheWrite: nonNegative(acc.totalTokens.cacheWrite),
316+
},
317+
totalCost: nonNegative(acc.totalCost),
308318
compactionCount: acc.compactionCount,
309319
autoCompactionCount: acc.autoCompactionCount,
310320
terminationReason: closeReason,

services/session-ingest/src/queue-consumer.ts

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ export interface IngestQueueMessage {
2020
ingestedAt: number;
2121
}
2222

23+
function elapsedMs(startedAt: number): number {
24+
return Date.now() - startedAt;
25+
}
26+
2327
/**
2428
* Creates a streaming item extractor that uses a low-level Tokenizer to parse
2529
* items from `$.data[]` one at a time, with a per-item byte budget.
@@ -194,6 +198,7 @@ async function processMessage(
194198
while (pending.length > 0) {
195199
const rawItem = pending.shift();
196200
if (!rawItem) break;
201+
const itemStartedAt = Date.now();
197202
try {
198203
await processItem(
199204
env,
@@ -209,6 +214,7 @@ async function processMessage(
209214
console.error('Error processing single item in queue consumer, continuing', {
210215
r2Key,
211216
type: rawItem['type'],
217+
durationMs: elapsedMs(itemStartedAt),
212218
error: err instanceof Error ? err.message : String(err),
213219
});
214220
}
@@ -253,25 +259,45 @@ async function processItem(
253259

254260
const item = parsed.data;
255261
const { item_id } = getItemIdentity(item);
262+
const startedAt = Date.now();
256263

257264
// Check if item data exceeds DO SQLite row limit (use byte length for non-ASCII safety)
258265
const itemDataJson = JSON.stringify(item.data);
266+
const itemDataBytes = new TextEncoder().encode(itemDataJson).byteLength;
259267
let r2References: Record<string, string> | undefined;
260-
if (new TextEncoder().encode(itemDataJson).byteLength > MAX_INGEST_ITEM_BYTES) {
268+
let r2PutMs = 0;
269+
if (itemDataBytes > MAX_INGEST_ITEM_BYTES) {
261270
const itemR2Key = `items/${kiloUserId}/${sessionId}/${item_id}/${ingestedAt}`;
271+
const r2PutStartedAt = Date.now();
262272
await env.SESSION_INGEST_R2.put(itemR2Key, itemDataJson);
273+
r2PutMs = elapsedMs(r2PutStartedAt);
263274
r2References = { [item_id]: itemR2Key };
264275
}
265276

277+
const doIngestStartedAt = Date.now();
266278
const ingestResult = await withDORetry(
267279
() => getSessionIngestDO(env, { kiloUserId, sessionId }),
268280
stub => stub.ingest([item], kiloUserId, sessionId, ingestVersion, ingestedAt, r2References),
269281
'SessionIngestDO.ingest'
270282
);
283+
const doIngestMs = elapsedMs(doIngestStartedAt);
271284

272285
for (const change of ingestResult.changes) {
273286
mergedChanges.set(change.name, change.value);
274287
}
288+
289+
const durationMs = elapsedMs(startedAt);
290+
console.info('session-ingest processItem timing', {
291+
r2Key,
292+
sessionId,
293+
type: item.type,
294+
itemDataBytes,
295+
r2Referenced: r2References !== undefined,
296+
r2PutMs,
297+
doIngestMs,
298+
durationMs,
299+
changes: ingestResult.changes.length,
300+
});
275301
}
276302

277303
type SessionMetadataUpdates = Partial<
@@ -349,6 +375,7 @@ async function applyMetadataChanges(
349375
mergedChanges.has('gitBranch') ||
350376
parentSessionId !== undefined;
351377

378+
const transactionStartedAt = Date.now();
352379
const notification = await db.transaction(async tx => {
353380
const statusChange =
354381
status === undefined
@@ -454,6 +481,12 @@ async function applyMetadataChanges(
454481
session: mapSessionEventRow(persistedRow),
455482
};
456483
});
484+
const transactionMs = elapsedMs(transactionStartedAt);
485+
console.info('session-ingest applyMetadataChanges transaction timing', {
486+
sessionId,
487+
metadataChanges: mergedChanges.size,
488+
transactionMs,
489+
});
457490

458491
if (!notification) return;
459492

services/session-ingest/src/session-events.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,16 @@ export function notifyUserSessionEvent(
5252
ctx?: { waitUntil(promise: Promise<unknown>): void }
5353
): void {
5454
const notify = async () => {
55+
const startedAt = Date.now();
5556
try {
5657
const stub = getUserConnectionDO(env, { kiloUserId });
5758
await stub.notifySessionEvent(event);
59+
const durationMs = Date.now() - startedAt;
60+
console.info('session-ingest notify timing', {
61+
event: event.type,
62+
sessionId: 'session' in event.data ? event.data.session.sessionId : event.data.sessionId,
63+
durationMs,
64+
});
5865
} catch (error) {
5966
console.error('Failed to notify session event (non-fatal)', {
6067
event: event.type,

0 commit comments

Comments
 (0)