Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions services/session-ingest/src/dos/session-metrics.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,31 @@ describe('computeSessionMetrics', () => {
expect(result.totalCost).toBeCloseTo(0.15);
});

it('clamps negative token and cost totals to 0', () => {
const items = [
makeItem('message', {
role: 'assistant',
time: { created: 1000 },
tokens: {
input: -10,
output: -20,
reasoning: -30,
cache: { read: -40, write: -50 },
},
cost: -0.05,
}),
];
const result = computeSessionMetrics(items, 'completed');
expect(result.totalTokens).toEqual({
input: 0,
output: 0,
reasoning: 0,
cacheRead: 0,
cacheWrite: 0,
});
expect(result.totalCost).toBe(0);
});

it('counts compaction parts', () => {
const items = [
makeItem('part', { type: 'compaction', auto: true }),
Expand Down
14 changes: 12 additions & 2 deletions services/session-ingest/src/dos/session-metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ function freshAccumulator(): Accumulator {
};
}

function nonNegative(value: number): number {
return Math.max(0, value);
}

function processKiloMeta(acc: Accumulator, raw: unknown) {
const parsed = KiloMetaSchema.safeParse(raw);
if (!parsed.success) return;
Expand Down Expand Up @@ -303,8 +307,14 @@ export function computeSessionMetrics(
totalErrors: acc.totalErrors,
errorsByType: acc.errorsByType,
stuckToolCallCount,
totalTokens: acc.totalTokens,
totalCost: acc.totalCost,
totalTokens: {
input: nonNegative(acc.totalTokens.input),
output: nonNegative(acc.totalTokens.output),
reasoning: nonNegative(acc.totalTokens.reasoning),
cacheRead: nonNegative(acc.totalTokens.cacheRead),
cacheWrite: nonNegative(acc.totalTokens.cacheWrite),
},
totalCost: nonNegative(acc.totalCost),
compactionCount: acc.compactionCount,
autoCompactionCount: acc.autoCompactionCount,
terminationReason: closeReason,
Expand Down
35 changes: 34 additions & 1 deletion services/session-ingest/src/queue-consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ export interface IngestQueueMessage {
ingestedAt: number;
}

function elapsedMs(startedAt: number): number {
return Date.now() - startedAt;
}

/**
* Creates a streaming item extractor that uses a low-level Tokenizer to parse
* items from `$.data[]` one at a time, with a per-item byte budget.
Expand Down Expand Up @@ -194,6 +198,7 @@ async function processMessage(
while (pending.length > 0) {
const rawItem = pending.shift();
if (!rawItem) break;
const itemStartedAt = Date.now();
try {
await processItem(
env,
Expand All @@ -209,6 +214,7 @@ async function processMessage(
console.error('Error processing single item in queue consumer, continuing', {
r2Key,
type: rawItem['type'],
durationMs: elapsedMs(itemStartedAt),
error: err instanceof Error ? err.message : String(err),
});
}
Expand Down Expand Up @@ -253,25 +259,45 @@ async function processItem(

const item = parsed.data;
const { item_id } = getItemIdentity(item);
const startedAt = Date.now();

// Check if item data exceeds DO SQLite row limit (use byte length for non-ASCII safety)
const itemDataJson = JSON.stringify(item.data);
const itemDataBytes = new TextEncoder().encode(itemDataJson).byteLength;
let r2References: Record<string, string> | undefined;
if (new TextEncoder().encode(itemDataJson).byteLength > MAX_INGEST_ITEM_BYTES) {
let r2PutMs = 0;
if (itemDataBytes > MAX_INGEST_ITEM_BYTES) {
const itemR2Key = `items/${kiloUserId}/${sessionId}/${item_id}/${ingestedAt}`;
const r2PutStartedAt = Date.now();
await env.SESSION_INGEST_R2.put(itemR2Key, itemDataJson);
r2PutMs = elapsedMs(r2PutStartedAt);
r2References = { [item_id]: itemR2Key };
}

const doIngestStartedAt = Date.now();
const ingestResult = await withDORetry(
() => getSessionIngestDO(env, { kiloUserId, sessionId }),
stub => stub.ingest([item], kiloUserId, sessionId, ingestVersion, ingestedAt, r2References),
'SessionIngestDO.ingest'
);
const doIngestMs = elapsedMs(doIngestStartedAt);

for (const change of ingestResult.changes) {
mergedChanges.set(change.name, change.value);
}

const durationMs = elapsedMs(startedAt);
console.info('session-ingest processItem timing', {
r2Key,
sessionId,
type: item.type,
itemDataBytes,
r2Referenced: r2References !== undefined,
r2PutMs,
doIngestMs,
durationMs,
changes: ingestResult.changes.length,
});
}

type SessionMetadataUpdates = Partial<
Expand Down Expand Up @@ -349,6 +375,7 @@ async function applyMetadataChanges(
mergedChanges.has('gitBranch') ||
parentSessionId !== undefined;

const transactionStartedAt = Date.now();
const notification = await db.transaction(async tx => {
const statusChange =
status === undefined
Expand Down Expand Up @@ -454,6 +481,12 @@ async function applyMetadataChanges(
session: mapSessionEventRow(persistedRow),
};
});
const transactionMs = elapsedMs(transactionStartedAt);
console.info('session-ingest applyMetadataChanges transaction timing', {
sessionId,
metadataChanges: mergedChanges.size,
transactionMs,
});

if (!notification) return;

Expand Down
7 changes: 7 additions & 0 deletions services/session-ingest/src/session-events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,16 @@ export function notifyUserSessionEvent(
ctx?: { waitUntil(promise: Promise<unknown>): void }
): void {
const notify = async () => {
const startedAt = Date.now();
try {
const stub = getUserConnectionDO(env, { kiloUserId });
await stub.notifySessionEvent(event);
const durationMs = Date.now() - startedAt;
console.info('session-ingest notify timing', {
event: event.type,
sessionId: 'session' in event.data ? event.data.session.sessionId : event.data.sessionId,
durationMs,
});
} catch (error) {
console.error('Failed to notify session event (non-fatal)', {
event: event.type,
Expand Down