diff --git a/services/session-ingest/src/dos/session-metrics.test.ts b/services/session-ingest/src/dos/session-metrics.test.ts index 82689150f5..c85fbcd845 100644 --- a/services/session-ingest/src/dos/session-metrics.test.ts +++ b/services/session-ingest/src/dos/session-metrics.test.ts @@ -148,6 +148,31 @@ describe('computeSessionMetrics', () => { expect(result.totalCost).toBeCloseTo(0.15); }); + it('clamps negative token and cost totals to 0', () => { + const items = [ + makeItem('message', { + role: 'assistant', + time: { created: 1000 }, + tokens: { + input: -10, + output: -20, + reasoning: -30, + cache: { read: -40, write: -50 }, + }, + cost: -0.05, + }), + ]; + const result = computeSessionMetrics(items, 'completed'); + expect(result.totalTokens).toEqual({ + input: 0, + output: 0, + reasoning: 0, + cacheRead: 0, + cacheWrite: 0, + }); + expect(result.totalCost).toBe(0); + }); + it('counts compaction parts', () => { const items = [ makeItem('part', { type: 'compaction', auto: true }), diff --git a/services/session-ingest/src/dos/session-metrics.ts b/services/session-ingest/src/dos/session-metrics.ts index db2ac9bbdd..7c6f456be1 100644 --- a/services/session-ingest/src/dos/session-metrics.ts +++ b/services/session-ingest/src/dos/session-metrics.ts @@ -147,6 +147,10 @@ function freshAccumulator(): Accumulator { }; } +function nonNegative(value: number): number { + return Math.max(0, value); +} + function processKiloMeta(acc: Accumulator, raw: unknown) { const parsed = KiloMetaSchema.safeParse(raw); if (!parsed.success) return; @@ -303,8 +307,14 @@ export function computeSessionMetrics( totalErrors: acc.totalErrors, errorsByType: acc.errorsByType, stuckToolCallCount, - totalTokens: acc.totalTokens, - totalCost: acc.totalCost, + totalTokens: { + input: nonNegative(acc.totalTokens.input), + output: nonNegative(acc.totalTokens.output), + reasoning: nonNegative(acc.totalTokens.reasoning), + cacheRead: nonNegative(acc.totalTokens.cacheRead), + cacheWrite: nonNegative(acc.totalTokens.cacheWrite), + }, + totalCost: nonNegative(acc.totalCost), compactionCount: acc.compactionCount, autoCompactionCount: acc.autoCompactionCount, terminationReason: closeReason, diff --git a/services/session-ingest/src/queue-consumer.ts b/services/session-ingest/src/queue-consumer.ts index a80a824c99..becbebb223 100644 --- a/services/session-ingest/src/queue-consumer.ts +++ b/services/session-ingest/src/queue-consumer.ts @@ -20,6 +20,10 @@ export interface IngestQueueMessage { ingestedAt: number; } +function elapsedMs(startedAt: number): number { + return Date.now() - startedAt; +} + /** * Creates a streaming item extractor that uses a low-level Tokenizer to parse * items from `$.data[]` one at a time, with a per-item byte budget. @@ -194,6 +198,7 @@ async function processMessage( while (pending.length > 0) { const rawItem = pending.shift(); if (!rawItem) break; + const itemStartedAt = Date.now(); try { await processItem( env, @@ -209,6 +214,7 @@ async function processMessage( console.error('Error processing single item in queue consumer, continuing', { r2Key, type: rawItem['type'], + durationMs: elapsedMs(itemStartedAt), error: err instanceof Error ? err.message : String(err), }); } @@ -253,25 +259,45 @@ async function processItem( const item = parsed.data; const { item_id } = getItemIdentity(item); + const startedAt = Date.now(); // Check if item data exceeds DO SQLite row limit (use byte length for non-ASCII safety) const itemDataJson = JSON.stringify(item.data); + const itemDataBytes = new TextEncoder().encode(itemDataJson).byteLength; let r2References: Record | undefined; - if (new TextEncoder().encode(itemDataJson).byteLength > MAX_INGEST_ITEM_BYTES) { + let r2PutMs = 0; + if (itemDataBytes > MAX_INGEST_ITEM_BYTES) { const itemR2Key = `items/${kiloUserId}/${sessionId}/${item_id}/${ingestedAt}`; + const r2PutStartedAt = Date.now(); await env.SESSION_INGEST_R2.put(itemR2Key, itemDataJson); + r2PutMs = elapsedMs(r2PutStartedAt); r2References = { [item_id]: itemR2Key }; } + const doIngestStartedAt = Date.now(); const ingestResult = await withDORetry( () => getSessionIngestDO(env, { kiloUserId, sessionId }), stub => stub.ingest([item], kiloUserId, sessionId, ingestVersion, ingestedAt, r2References), 'SessionIngestDO.ingest' ); + const doIngestMs = elapsedMs(doIngestStartedAt); for (const change of ingestResult.changes) { mergedChanges.set(change.name, change.value); } + + const durationMs = elapsedMs(startedAt); + console.info('session-ingest processItem timing', { + r2Key, + sessionId, + type: item.type, + itemDataBytes, + r2Referenced: r2References !== undefined, + r2PutMs, + doIngestMs, + durationMs, + changes: ingestResult.changes.length, + }); } type SessionMetadataUpdates = Partial< @@ -349,6 +375,7 @@ async function applyMetadataChanges( mergedChanges.has('gitBranch') || parentSessionId !== undefined; + const transactionStartedAt = Date.now(); const notification = await db.transaction(async tx => { const statusChange = status === undefined @@ -454,6 +481,12 @@ async function applyMetadataChanges( session: mapSessionEventRow(persistedRow), }; }); + const transactionMs = elapsedMs(transactionStartedAt); + console.info('session-ingest applyMetadataChanges transaction timing', { + sessionId, + metadataChanges: mergedChanges.size, + transactionMs, + }); if (!notification) return; diff --git a/services/session-ingest/src/session-events.ts b/services/session-ingest/src/session-events.ts index 2fdb33d06c..61b448bfe8 100644 --- a/services/session-ingest/src/session-events.ts +++ b/services/session-ingest/src/session-events.ts @@ -52,9 +52,16 @@ export function notifyUserSessionEvent( ctx?: { waitUntil(promise: Promise): void } ): void { const notify = async () => { + const startedAt = Date.now(); try { const stub = getUserConnectionDO(env, { kiloUserId }); await stub.notifySessionEvent(event); + const durationMs = Date.now() - startedAt; + console.info('session-ingest notify timing', { + event: event.type, + sessionId: 'session' in event.data ? event.data.session.sessionId : event.data.sessionId, + durationMs, + }); } catch (error) { console.error('Failed to notify session event (non-fatal)', { event: event.type,