diff --git a/.changeset/context-compaction.md b/.changeset/context-compaction.md new file mode 100644 index 0000000000..aa01920e9a --- /dev/null +++ b/.changeset/context-compaction.md @@ -0,0 +1,26 @@ +--- +'@electric-ax/agents-runtime': patch +'@electric-ax/agents-server-ui': patch +--- + +Context compaction for the agents runtime. Modelled on OpenAI Codex's +summarization but adapted to the event-sourced timeline (a `context_inserted` +checkpoint placed at a stored watermark, so reconstruction folds older messages +into a summary): + +- A context-window usage gauge (cache-inclusive `context_input_tokens` + + `context_window` persisted per step) and `` notices injected at + 25 / 50 / 75% usage. +- Oversized tool-output truncation, and a synchronous mid-turn compaction floor + at the 90% hard ceiling (runs before every model step via the adapter's + `transformContext` hook). +- Non-blocking background (turn-end) compaction that starts at 85%: a detached + summarize whose checkpoint is applied at the next turn's start, or immediately + if it finishes while idle. Each generation uses a watermark-unique checkpoint + id so a new run can't supersede a prior completed one. Summarize calls are + bounded by a hard timeout. +- UI: a "Compacting…" indicator (blocking vs. background) and a collapsible + "Context compacted" entry in the conversation timeline. + +Thresholds are env-tunable (`ELECTRIC_AGENTS_COMPACT_CEILING`, +`ELECTRIC_AGENTS_COMPACT_BG_CEILING`). diff --git a/packages/agents-runtime/src/client.ts b/packages/agents-runtime/src/client.ts index 55306cc03a..40bcec94e9 100644 --- a/packages/agents-runtime/src/client.ts +++ b/packages/agents-runtime/src/client.ts @@ -46,6 +46,18 @@ export { // drift from the runtime dispatcher. export { isGoalCommandText, parseGoalCommand } from './goal-command' export { formatTokenCount } from './token-budget' +export { + CONTEXT_USAGE_BACKGROUND_START, + CONTEXT_USAGE_HARD_CEILING, + computeContextUsage, + contextUsageLevel, + formatContextUsagePercent, +} from './token-accountant' +export type { + ContextUsage, + ContextUsageInput, + ContextUsageLevel, +} from './token-accountant' export type { GoalCommand } from './goal-command' export type { diff --git a/packages/agents-runtime/src/compaction-midturn.ts b/packages/agents-runtime/src/compaction-midturn.ts new file mode 100644 index 0000000000..b72e305ed0 --- /dev/null +++ b/packages/agents-runtime/src/compaction-midturn.ts @@ -0,0 +1,118 @@ +import { COMPACTION_CHECKPOINT_NAME } from './compaction' +import type { CompactionStatus } from './compaction' + +/** A pi-agent message — loose shape; we only build and slice these. */ +export type AgentMessageLike = { + role: string + content: unknown + timestamp?: number +} + +export interface MidTurnCompactorDeps { + /** Summarize the given (older) messages into a handoff summary string. */ + summarize: (messages: Array) => Promise + /** Persist the checkpoint lifecycle row (UI marker + future reconstruction). */ + writeCheckpoint: (status: CompactionStatus, content: string) => void + /** Compaction fires at/above this fraction of the context window. */ + ceiling: number +} + +export interface CompactContextInput { + messages: Array + /** Estimated tokens of the outgoing context (real last-step usage + tail). */ + currentTokens: number + contextWindow: number +} + +export type CompactContextFn = ( + input: CompactContextInput +) => Promise | null> + +export function buildCompactionSummaryMessage( + summary: string +): AgentMessageLike { + return { + role: `user`, + content: [ + { + type: `text`, + text: `<${COMPACTION_CHECKPOINT_NAME}>\n${summary}\n`, + }, + ], + timestamp: Date.now(), + } +} + +/** + * Build the per-turn mid-turn compaction hook for one agent run. The returned + * function is wired to pi-agent's `transformContext` — it runs before every + * model step. Once the estimated outgoing context crosses the ceiling it folds + * the WHOLE context into a summary and continues from `[summary, ...anything + * appended since]` (Codex-style — no verbatim pre-compaction tail). + * + * Summarizing everything is what keeps the persisted checkpoint sound: the + * `writeCheckpoint` wiring stamps it with `watermark = timeline head`, and a + * verbatim tail kept below that head would be dropped on the next turn's + * reconstruction yet excluded from the summary — silently losing context. By + * covering everything up to the head, summary and watermark agree. + * + * The summary is cached for the rest of the turn: later steps reuse it + * (returning the compacted view) instead of re-summarizing. Coverage is only + * extended once new messages appended this turn push back over the ceiling, and + * a re-summarization chains off the previous summary (prev summary + new + * middle) rather than re-reading the whole already-summarized bulk. + * + * Returns `null` to leave the context untouched (no compaction needed/active). + */ +export function createMidTurnCompactor( + deps: MidTurnCompactorDeps +): CompactContextFn { + let state: { summary: string; coveredCount: number } | null = null + + const compactedView = ( + messages: Array + ): Array | null => + state + ? [ + buildCompactionSummaryMessage(state.summary), + ...messages.slice(state.coveredCount), + ] + : null + + return async ({ messages, currentTokens, contextWindow }) => { + const overCeiling = currentTokens >= deps.ceiling * contextWindow + + // Under the ceiling: keep the compacted view sticky if we already compacted + // this turn, otherwise leave the context untouched. + if (!overCeiling) return compactedView(messages) + + // Fold the ENTIRE current context (everything maps to timeline items at or + // below the head the checkpoint will store), so summary and watermark agree. + const coveredCount = messages.length + + // Nothing new to fold beyond the existing summary's coverage. + if (coveredCount <= 0) return compactedView(messages) + if (state && coveredCount <= state.coveredCount) + return compactedView(messages) + + // Chain off the previous summary so we don't re-summarize the whole bulk. + const toSummarize = state + ? [ + buildCompactionSummaryMessage(state.summary), + ...messages.slice(state.coveredCount, coveredCount), + ] + : messages.slice(0, coveredCount) + + deps.writeCheckpoint(`running`, ``) + try { + const summary = await deps.summarize(toSummarize) + deps.writeCheckpoint(`complete`, summary) + state = { summary, coveredCount } + return compactedView(messages) + } catch { + deps.writeCheckpoint(`failed`, ``) + // Fall back to any compaction we already had (or leave untouched). + return compactedView(messages) + } + } +} diff --git a/packages/agents-runtime/src/compaction-summarize.ts b/packages/agents-runtime/src/compaction-summarize.ts new file mode 100644 index 0000000000..cfa3d9bfcc --- /dev/null +++ b/packages/agents-runtime/src/compaction-summarize.ts @@ -0,0 +1,132 @@ +import { completeSimple } from '@mariozechner/pi-ai' +import { resolvePiModel, toAgentHistory } from './pi-adapter' +import { + COMPACTION_SUMMARIZATION_PROMPT, + COMPACTION_SUMMARY_PREFIX, +} from './compaction' +import type { LLMMessage, SummarizeCompleteFn } from './types' + +export type { SummarizeCompleteFn } + +const DEFAULT_SUMMARY_MAX_TOKENS = 2048 + +/** + * Hard deadline for a single summarization request. + * + * pi-ai's anthropic provider applies a client-side timeout (and abort) ONLY + * when the caller passes `timeoutMs`/`signal`, and it never retries. Background + * compaction fires this call CONCURRENTLY with the agent's own streaming turn on + * the same (OAuth) token; if that concurrent stream stalls, an unbounded call + * hangs forever — wedging the pending slot and blocking all future attempts. + * Bounding it turns a stall into a failure the caller can retry next turn-end. + */ +const DEFAULT_SUMMARY_TIMEOUT_MS = 120_000 + +/** + * Summarize a conversation into a compaction handoff summary. + * + * Uses the conversation's own model by default: a cheaper, small-window model + * would overflow on a near-full context — the whole reason we are compacting. + * The full history is sent followed by Codex's summarization prompt; the summary + * is prefixed with Codex's preamble so the resuming model knows it's a handoff. + */ +interface SummarizeCoreInput { + model: string | object + provider?: string + apiKey?: string + maxTokens?: number + /** Hard deadline for the model call; defaults to {@link DEFAULT_SUMMARY_TIMEOUT_MS}. */ + timeoutMs?: number + complete?: SummarizeCompleteFn +} + +/** + * Core summarization over already-converted history messages (pi-agent's + * `AgentMessage[]` shape). Appends Codex's summarization prompt, calls the + * model, and prefixes the result. Both the LLMMessage path and the mid-turn + * AgentMessage path funnel through here. + */ +async function summarizeConverted( + historyMessages: ReadonlyArray, + input: SummarizeCoreInput +): Promise { + const complete = + input.complete ?? (completeSimple as unknown as SummarizeCompleteFn) + const model = resolvePiModel({ + model: input.model as never, + ...(input.provider && { provider: input.provider as never }), + }) + const context = { + messages: [ + ...historyMessages, + { + role: `user`, + content: COMPACTION_SUMMARIZATION_PROMPT, + timestamp: Date.now(), + }, + ], + } + + // Bound the call: pass `timeoutMs`/`signal` (which the anthropic provider + // honours) AND race against a hard timer, so a stalled stream that ignores the + // abort still rejects rather than hanging the background slot forever. + const timeoutMs = input.timeoutMs ?? DEFAULT_SUMMARY_TIMEOUT_MS + const controller = new AbortController() + let timer: ReturnType | undefined + const timeout = new Promise((_, reject) => { + timer = setTimeout(() => { + controller.abort() + reject(new Error(`[compaction] summarize timed out after ${timeoutMs}ms`)) + }, timeoutMs) + }) + + const call = complete(model, context, { + maxTokens: input.maxTokens ?? DEFAULT_SUMMARY_MAX_TOKENS, + ...(input.apiKey && { apiKey: input.apiKey }), + signal: controller.signal, + timeoutMs, + }) + // If the timeout wins the race, `call` rejects later (aborted) — swallow it so + // the loser doesn't surface as an unhandled rejection. + call.catch(() => {}) + + let res: Awaited> + try { + res = await Promise.race([call, timeout]) + } finally { + if (timer) clearTimeout(timer) + } + + const textBlock = res.content.find((block) => block.type === `text`) + const text = textBlock && `text` in textBlock ? (textBlock.text ?? ``) : `` + if (text.trim().length === 0) { + throw new Error( + `[compaction] empty summary (stopReason=${res.stopReason ?? `none`} error=${res.errorMessage ?? `none`})` + ) + } + + return `${COMPACTION_SUMMARY_PREFIX}\n${text}` +} + +/** + * Summarize a conversation (LLMMessage form) into a compaction handoff summary. + * + * Uses the conversation's own model by default: a cheaper, small-window model + * would overflow on a near-full context — the whole reason we are compacting. + */ +export async function summarizeMessages( + input: SummarizeCoreInput & { messages: ReadonlyArray } +): Promise { + return summarizeConverted(toAgentHistory([...input.messages]), input) +} + +/** + * Summarize already-converted `AgentMessage[]` (what `transformContext` hands + * us mid-turn) — same as `summarizeMessages` but skips the LLMMessage→Agent + * conversion since the messages are already in that shape. + */ +export async function summarizeAgentMessages( + input: SummarizeCoreInput & { messages: ReadonlyArray } +): Promise { + return summarizeConverted(input.messages, input) +} diff --git a/packages/agents-runtime/src/compaction.ts b/packages/agents-runtime/src/compaction.ts new file mode 100644 index 0000000000..c9c1ad5356 --- /dev/null +++ b/packages/agents-runtime/src/compaction.ts @@ -0,0 +1,69 @@ +/** + * Context compaction. + * + * When the conversation approaches the context window, it is summarized into a + * durable "checkpoint" — a `context_inserted` row tagged `kind: "compaction"` — + * and the messages it summarizes are dropped from the reconstructed history. The + * checkpoint carries a watermark: reconstruction hides everything up to it and + * emits the summary in their place (see `timelineMessages`). + */ + +/** `attrs.kind` marking a `context_inserted` row as a compaction checkpoint. */ +export const COMPACTION_CHECKPOINT_KIND = `compaction` + +/** `name` (and thus the rendered tag) for a compaction checkpoint entry. */ +export const COMPACTION_CHECKPOINT_NAME = `compaction_summary` + +/** Stable id for the (single, self-superseding) compaction checkpoint entry. */ +export const COMPACTION_CHECKPOINT_ID = `compaction` + +/** + * Lifecycle of a compaction checkpoint, carried in `attrs.status`: + * - `running` — summarization in flight (UI shows a live "Compacting…" entry) + * - `complete` — summary ready; acts as the timeline watermark + * - `failed` — summarization failed; turn proceeded uncompacted + */ +export type CompactionStatus = `running` | `complete` | `failed` + +/** + * Whether a `context_inserted` row's attrs mark it as a compaction checkpoint + * (any status). + */ +export function isCompactionCheckpointAttrs( + attrs: Record | undefined +): boolean { + return attrs?.kind === COMPACTION_CHECKPOINT_KIND +} + +/** + * Whether attrs mark a *completed* compaction checkpoint — the only state that + * acts as the reconstruction watermark. A `running` (or crashed) checkpoint + * must never hide history. + */ +export function isCompleteCompactionCheckpointAttrs( + attrs: Record | undefined +): boolean { + return ( + attrs?.kind === COMPACTION_CHECKPOINT_KIND && attrs?.status === `complete` + ) +} + +/** + * Summarization prompt, reused verbatim from OpenAI Codex. Appended as a user + * message after the conversation being compacted. + */ +export const COMPACTION_SUMMARIZATION_PROMPT = `You are performing a CONTEXT CHECKPOINT COMPACTION. Create a handoff summary for another LLM that will resume the task. + +Include: +- Current progress and key decisions made +- Important context, constraints, or user preferences +- What remains to be done (clear next steps) +- Any critical data, examples, or references needed to continue + +Be concise, structured, and focused on helping the next LLM seamlessly continue the work.` + +/** + * Prefix prepended to the produced summary when it is reinserted as the + * checkpoint, reused verbatim from Codex. + */ +export const COMPACTION_SUMMARY_PREFIX = `Another language model started to solve this problem and produced a summary of its thinking process. You also have access to the state of the tools that were used by that language model. Use this to build on the work that has already been done and avoid duplicating work. Here is the summary produced by the other language model, use the information in this summary to assist with your own analysis:` diff --git a/packages/agents-runtime/src/context-factory.ts b/packages/agents-runtime/src/context-factory.ts index bd2812e2fb..38ccb2e5d8 100644 --- a/packages/agents-runtime/src/context-factory.ts +++ b/packages/agents-runtime/src/context-factory.ts @@ -17,6 +17,26 @@ import { import { getCronStreamPath } from './cron-utils' import { runtimeLog } from './log' import { sliceChars } from './token-budget' +import { + selectLatestContextUsage, + truncateOversizedToolResults, + withContextBudgetNotice, + CONTEXT_USAGE_HARD_CEILING, + type ContextUsageStep, +} from './token-accountant' +import { + COMPACTION_CHECKPOINT_ID, + COMPACTION_CHECKPOINT_KIND, + COMPACTION_CHECKPOINT_NAME, + type CompactionStatus, +} from './compaction' +import { createMidTurnCompactor } from './compaction-midturn' +import { + summarizeAgentMessages, + summarizeMessages, +} from './compaction-summarize' +import { CONTEXT_USAGE_BACKGROUND_START } from './token-accountant' + import { createContextTools } from './tools/context-tools' import { CACHE_TIERS } from './types' import { composeToolsWithProviders } from './tool-providers' @@ -71,6 +91,12 @@ function agentModelProvider(config: AgentConfig): string { : config.model.provider } +/** Parse a 0..1 ratio from an env string, falling back when invalid. */ +function ratioFromEnv(raw: string | undefined, fallback: number): number { + const value = Number(raw) + return Number.isFinite(value) && value > 0 && value <= 1 ? value : fallback +} + const MAX_HYDRATED_IMAGE_ATTACHMENTS = 4 const MAX_HYDRATED_IMAGE_ATTACHMENT_BYTES = 10 * 1024 * 1024 @@ -146,6 +172,19 @@ export interface HandlerContextConfig { export interface HandlerContextResult { ctx: HandlerContext getSleepRequested: () => boolean + /** + * Background (turn-end) compaction, driven by process-wake. Returns a handle + * whose promise resolves to the summary, or null when compaction isn't + * warranted (low usage / no agent / already compacted). + */ + maybeStartBackgroundCompaction: () => { + watermark: number + promise: Promise + } | null + /** Persist a completed background compaction checkpoint at its watermark. */ + writeBackgroundCheckpoint: (watermark: number, summary: string) => void + /** Mark an in-flight background compaction (identified by its watermark) failed. */ + failBackgroundCheckpoint: (watermark: number) => void } type DebugHandlerContext = @@ -773,36 +812,101 @@ export function createHandlerContext( const composedTools = (await composeToolsWithProviders( activeAgentConfig.tools )) as Array + + // Drives the remaining-budget notice injected below. Synthesized fresh + // from the latest step, never persisted — a self-superseding context row + // would leave misleading load_context_history tombstones. + const budgetUsage = selectLatestContextUsage( + config.db.collections.steps.toArray as ReadonlyArray + ) + + // Mid-turn compaction: runs before every model step (the adapter's + // transformContext) so one tool-heavy turn can't exhaust the window + // before it ends. Overridable via ELECTRIC_AGENTS_COMPACT_CEILING. + const compactCeiling = ratioFromEnv( + process.env.ELECTRIC_AGENTS_COMPACT_CEILING, + CONTEXT_USAGE_HARD_CEILING + ) + const compactProvider = agentModelProvider(activeAgentConfig) + // Watermark for the mid-turn summary, snapshotted when the `running` row + // is written (before the slow summarize await) so an event materializing + // during the summarize can't bump the head past what the summary covered. + let midTurnWatermark: number | undefined + const onCompactContext = createMidTurnCompactor({ + ceiling: compactCeiling, + writeCheckpoint: (status, content) => { + const attrs: Record = { + kind: COMPACTION_CHECKPOINT_KIND, + status, + } + if (status === `running`) { + const head = runtimeTimelineMessages(config.db).reduce( + (max, message) => Math.max(max, message.at), + Number.NEGATIVE_INFINITY + ) + midTurnWatermark = Number.isFinite(head) ? head : undefined + } + // Stamp `complete` with that head: reconstruction folds items <= + // head (what the summary covered) and keeps everything after. Without + // it the watermark falls back to the row's own (later) position and + // drops the most recent messages next turn. + if (status === `complete` && midTurnWatermark !== undefined) { + attrs.watermark = midTurnWatermark + } + contextApi.insertContext(COMPACTION_CHECKPOINT_ID, { + name: COMPACTION_CHECKPOINT_NAME, + attrs, + content, + }) + }, + summarize: async (agentMessages) => { + const apiKey = await activeAgentConfig.getApiKey?.(compactProvider) + return summarizeAgentMessages({ + model: activeAgentConfig.model, + provider: compactProvider, + messages: agentMessages, + ...(apiKey ? { apiKey } : {}), + ...(activeAgentConfig.summarizeComplete + ? { complete: activeAgentConfig.summarizeComplete } + : {}), + }) + }, + }) + + // Cap any single oversized tool result (one huge output can fill the + // window on its own), then surface the budget notice. + const outgoingMessages = withContextBudgetNotice( + truncateOversizedToolResults(messages), + budgetUsage + ) + const adapterFactory = createPiAgentAdapter({ systemPrompt: activeAgentConfig.systemPrompt, model: activeAgentConfig.model, - provider: activeAgentConfig.provider, - tools: [...composedTools, ...extraTools] as Array, - streamFn: activeAgentConfig.streamFn, - getApiKey: activeAgentConfig.getApiKey, - reasoning: activeAgentConfig.reasoning, thinkingBudgets: activeAgentConfig.thinkingBudgets, - onPayload: activeAgentConfig.onPayload, - onStepEnd: activeAgentConfig.onStepEnd, modelTimeoutMs: activeAgentConfig.modelTimeoutMs, modelMaxRetries: activeAgentConfig.modelMaxRetries, + onCompactContext, + ...(budgetUsage + ? { initialContextTokens: budgetUsage.usedTokens } + : {}), }) const handle = adapterFactory({ entityUrl: config.entityUrl, epoch: config.epoch, - messages, + messages: outgoingMessages, outboundIdSeed: await loadOutboundIdSeed(config.db), writeEvent: config.writeEvent, }) - const latestMessageRole = messages.at(-1)?.role + const latestMessageRole = outgoingMessages.at(-1)?.role const runInput = input !== undefined || config.hydratedWebhookSourceWake != null || @@ -815,14 +919,14 @@ export function createHandlerContext( logPrefix, `agent.run starting provider=${agentModelProvider(activeAgentConfig)} ` + `model=${agentModelId(activeAgentConfig.model)} ` + - `messages=${messages.length} latestRole=${latestMessageRole ?? `none`} ` + + `messages=${outgoingMessages.length} latestRole=${latestMessageRole ?? `none`} ` + `wakeType=${config.wakeEvent.type} wakeOffset=${config.wakeOffset} ` + `triggerMessageLen=${messageText.length} ` + `runInputLen=${runInput?.length ?? 0} ` + `tools=${composedTools.length + extraTools.length}` ) - if (messages.length > 0) { - const tail = messages.slice(-3) + if (outgoingMessages.length > 0) { + const tail = outgoingMessages.slice(-3) runtimeLog.info( logPrefix, `agent.run last messages: ${tail @@ -1148,8 +1252,118 @@ export function createHandlerContext( }, } + // ── Background compaction (turn-end, non-blocking) ────────────────────── + // Driven by process-wake: after a turn, maybeStartBackgroundCompaction kicks + // off a detached summarization if usage is high; when it resolves, the NEXT + // turn writes the checkpoint via writeBackgroundCheckpoint (so all event + // writes stay inside a handler invocation). The checkpoint stores the + // snapshot watermark so reconstruction places the summary before any messages + // that arrived while it ran (see timeline-context). + const writeCompactionCheckpoint = ( + status: CompactionStatus, + content: string, + extraAttrs: Record = {}, + id: string = COMPACTION_CHECKPOINT_ID + ): void => { + contextApi.insertContext(id, { + name: COMPACTION_CHECKPOINT_NAME, + attrs: { kind: COMPACTION_CHECKPOINT_KIND, status, ...extraAttrs }, + content, + }) + } + + // Watermark-unique id per background generation, so a new `running` row can't + // supersede the previous `complete` one (supersession keys on id alone). + // running→complete→failed of one generation share the id. + const backgroundCheckpointId = (watermark: number): string => + `${COMPACTION_CHECKPOINT_ID}-bg-${watermark}` + + const latestCompleteCheckpointWatermark = (): number => { + let watermark = Number.NEGATIVE_INFINITY + for (const row of config.db.collections.contextInserted.toArray) { + const r = row as { + name?: string + attrs?: { kind?: string; status?: string; watermark?: unknown } + } + if ( + r.name !== COMPACTION_CHECKPOINT_NAME || + r.attrs?.kind !== COMPACTION_CHECKPOINT_KIND || + r.attrs?.status !== `complete` + ) { + continue + } + const w = + typeof r.attrs.watermark === `number` + ? r.attrs.watermark + : Number(r.attrs.watermark) + if (Number.isFinite(w) && w > watermark) watermark = w + } + return watermark + } + + const maybeStartBackgroundCompaction = (): { + watermark: number + promise: Promise + } | null => { + const cfg = agentConfig + if (!cfg) return null + const usage = selectLatestContextUsage( + config.db.collections.steps.toArray as ReadonlyArray + ) + if (!usage) return null + const ceiling = ratioFromEnv( + process.env.ELECTRIC_AGENTS_COMPACT_BG_CEILING, + CONTEXT_USAGE_BACKGROUND_START + ) + if (usage.ratio < ceiling) return null + const timestamped = runtimeTimelineMessages(config.db) + if (timestamped.length === 0) return null + const head = timestamped.reduce( + (max, message) => Math.max(max, message.at), + Number.NEGATIVE_INFINITY + ) + // Already summarized up to/past the head — nothing new to compact. + if (latestCompleteCheckpointWatermark() >= head) return null + + const messages = timelineToMessages(config.db) + const provider = agentModelProvider(cfg) + // Background-flavored `running` checkpoint for the UI (subtle indicator). + writeCompactionCheckpoint( + `running`, + ``, + { background: true, watermark: head }, + backgroundCheckpointId(head) + ) + const promise = (async () => { + const apiKey = await cfg.getApiKey?.(provider) + return summarizeMessages({ + model: cfg.model, + provider, + messages, + ...(apiKey ? { apiKey } : {}), + ...(cfg.summarizeComplete ? { complete: cfg.summarizeComplete } : {}), + }) + })() + return { watermark: head, promise } + } + return { ctx, getSleepRequested: () => sleepRequested, + maybeStartBackgroundCompaction, + writeBackgroundCheckpoint: (watermark: number, summary: string) => + writeCompactionCheckpoint( + `complete`, + summary, + { background: true, watermark }, + backgroundCheckpointId(watermark) + ), + failBackgroundCheckpoint: (watermark: number) => + writeCompactionCheckpoint( + `failed`, + ``, + { background: true }, + backgroundCheckpointId(watermark) + ), } } diff --git a/packages/agents-runtime/src/entity-schema.ts b/packages/agents-runtime/src/entity-schema.ts index d94b99eac9..2e55fb6307 100644 --- a/packages/agents-runtime/src/entity-schema.ts +++ b/packages/agents-runtime/src/entity-schema.ts @@ -156,15 +156,15 @@ type StepValue = { model_provider?: string model_id?: string duration_ms?: number - // Token usage for this step as reported by the provider's - // end-of-message `usage` payload. Populated on `onStepEnd` when the - // adapter has the data — older events without these fields stay - // valid (both optional), so this is a strictly additive change. - // `input_tokens` is the *uncached* input side (fresh tokens plus - // cache writes; cache reads excluded) — the cache-inclusive total - // would re-count the whole conversation on every step. + // Uncached input side (fresh tokens + cache writes; cache reads excluded, so + // it doesn't re-count the conversation every warm turn). input_tokens?: number output_tokens?: number + // Cache-inclusive prompt size (input + cacheRead + cacheWrite) — how much of + // the window the request occupied; the number a fullness gauge needs. + context_input_tokens?: number + // The model's context window for this step. + context_window?: number } type TextValue = { key?: string diff --git a/packages/agents-runtime/src/index.ts b/packages/agents-runtime/src/index.ts index 61093c87b4..8222347ab1 100644 --- a/packages/agents-runtime/src/index.ts +++ b/packages/agents-runtime/src/index.ts @@ -360,6 +360,27 @@ export { export type { GoalCommand, GoalDispatchResult } from './goal-command' export { assembleContext } from './context-assembly' export { approxTokens, formatTokenCount, sliceChars } from './token-budget' +export { + CONTEXT_USAGE_AWARENESS_THRESHOLDS, + CONTEXT_USAGE_BACKGROUND_START, + CONTEXT_USAGE_HARD_CEILING, + computeContextUsage, + contextUsageLevel, + formatContextUsagePercent, + selectLatestContextUsage, + shouldSurfaceContextBudget, + formatContextBudgetNotice, + buildContextBudgetNotice, + withContextBudgetNotice, + truncateOversizedToolResults, + CONTEXT_TOOL_OUTPUT_MAX_TOKENS, +} from './token-accountant' +export type { + ContextUsage, + ContextUsageInput, + ContextUsageLevel, + ContextUsageStep, +} from './token-accountant' export { createContextTools } from './tools/context-tools' export { completeWithLowCostModel, diff --git a/packages/agents-runtime/src/outbound-bridge.ts b/packages/agents-runtime/src/outbound-bridge.ts index 87c902df93..9d589f34f9 100644 --- a/packages/agents-runtime/src/outbound-bridge.ts +++ b/packages/agents-runtime/src/outbound-bridge.ts @@ -156,6 +156,12 @@ export interface OutboundBridge { // persisted to the step row — forwarded to hooks for budget accounting. tokenInputUncached?: number tokenOutput?: number + // Cache-inclusive prompt size (`input + cacheRead + cacheWrite`), + // persisted to the step as `context_input_tokens` for the context-usage + // gauge. Distinct from `tokenInput`, which excludes cache reads. + tokenContext?: number + // Model context window for this step, persisted as `context_window`. + contextWindow?: number durationMs?: number }) => void onTextStart: () => void @@ -323,6 +329,8 @@ export function createOutboundBridge( tokenInput?: number tokenInputUncached?: number tokenOutput?: number + tokenContext?: number + contextWindow?: number durationMs?: number }) { if (!currentStepKey) return @@ -343,6 +351,12 @@ export function createOutboundBridge( ...(opts?.tokenOutput !== undefined && { output_tokens: opts.tokenOutput, }), + ...(opts?.tokenContext !== undefined && { + context_input_tokens: opts.tokenContext, + }), + ...(opts?.contextWindow !== undefined && { + context_window: opts.contextWindow, + }), } as never, }) as ChangeEvent ) diff --git a/packages/agents-runtime/src/pi-adapter.ts b/packages/agents-runtime/src/pi-adapter.ts index 8b8d59eff0..863c584386 100644 --- a/packages/agents-runtime/src/pi-adapter.ts +++ b/packages/agents-runtime/src/pi-adapter.ts @@ -12,6 +12,8 @@ import { getModel, streamSimple } from '@mariozechner/pi-ai' import { createOutboundBridge } from './outbound-bridge' import { MOONSHOT_PROVIDER, getMoonshotModel } from './moonshot-models' import { runtimeLog } from './log' +import { approxTokens } from './token-budget' +import type { AgentMessageLike, CompactContextFn } from './compaction-midturn' import { ModelProviderError, toModelProviderError, @@ -85,6 +87,14 @@ export interface PiAdapterOptions { }) => void modelTimeoutMs?: number modelMaxRetries?: number + // Mid-turn compaction hook. Called before each model step with the outgoing + // messages; may return a compacted message list to send instead. The adapter + // supplies `currentTokens` (real last-step usage + estimated trailing) and the + // model's context window so the hook can decide. See createMidTurnCompactor. + onCompactContext?: CompactContextFn + // Real cache-inclusive token usage entering this run (the previous turn's + // last step), used to anchor the first step's token estimate. + initialContextTokens?: number } const DEFAULT_MODEL_TIMEOUT_MS = 30_000 @@ -289,6 +299,53 @@ export function createPiAgentAdapter( maxRetries: modelMaxRetries, }) + // Mid-turn compaction token accounting (Codex-style): anchor on the real + // cache-inclusive usage reported for the last model step, plus an estimate + // of only the items appended since (the trailing tail). `anchorMessageCount` + // marks the message-array length at that last step so the trailing slice can + // be measured. Initialised from the previous turn's usage for the first call. + const estimateContent = (m: AgentMessageLike): number => + approxTokens((m as { content?: unknown }).content) + let anchorTokens = + opts.initialContextTokens ?? + (history as Array).reduce( + (sum, m) => sum + estimateContent(m), + 0 + ) + let anchorMessageCount = history.length + let pendingRequestMessageCount = anchorMessageCount + + const modelContextWindow = + typeof model.contextWindow === `number` ? model.contextWindow : 0 + + const transformContext = + opts.onCompactContext && modelContextWindow > 0 + ? async ( + messages: Array + ): Promise> => { + const list = messages as unknown as Array + const trailingTokens = list + .slice(anchorMessageCount) + .reduce((sum, m) => sum + estimateContent(m), 0) + const currentTokens = anchorTokens + trailingTokens + // Anchor the NEXT step's trailing estimate on this step's *incoming* + // (uncompacted) message count — NOT the compacted list we may return. + // pi-agent hands transformContext the full conversation every step, + // so `list.slice(anchorMessageCount)` next step measures exactly the + // messages appended since. `anchorTokens` is separately re-anchored + // to the step's real cache-inclusive usage at message_end. + pendingRequestMessageCount = messages.length + const compacted = await opts.onCompactContext!({ + messages: list, + currentTokens, + contextWindow: modelContextWindow, + }) + return compacted + ? (compacted as unknown as Array) + : messages + } + : undefined + const agentOptions = { initialState: { systemPrompt: opts.systemPrompt, @@ -297,6 +354,7 @@ export function createPiAgentAdapter( model, }, streamFn, + ...(transformContext && { transformContext }), ...(opts.getApiKey && { getApiKey: opts.getApiKey }), ...(opts.onPayload && { onPayload: opts.onPayload }), } @@ -531,6 +589,30 @@ export function createPiAgentAdapter( : typeof usage?.outputTokens === `number` ? usage.outputTokens : undefined + // Cache-INCLUSIVE prompt size: every token the request put in + // the context window, including prompt-cache reads. This is + // what a "% of context used" gauge needs — cached tokens still + // occupy the window even though `usageInput` excludes them for + // budget accounting. + const usageContext = + sumPresentNumbers([ + usage?.input, + usage?.cacheWrite, + usage?.cacheRead, + ]) ?? + (typeof usage?.inputTokens === `number` + ? usage.inputTokens + : undefined) + const contextWindow = + typeof model.contextWindow === `number` + ? model.contextWindow + : undefined + // Re-anchor the mid-turn token estimate on this step's real + // (cache-inclusive) usage and the message count we sent. + if (usageContext !== undefined) { + anchorTokens = usageContext + anchorMessageCount = pendingRequestMessageCount + } bridge.onStepEnd({ finishReason, durationMs: Date.now() - stepStartTime, @@ -541,6 +623,10 @@ export function createPiAgentAdapter( ...(usageOutput !== undefined && { tokenOutput: usageOutput, }), + ...(usageContext !== undefined && { + tokenContext: usageContext, + }), + ...(contextWindow !== undefined && { contextWindow }), }) if (isError) { diff --git a/packages/agents-runtime/src/process-wake.ts b/packages/agents-runtime/src/process-wake.ts index 5ce8e2d062..abeb3bf2f1 100644 --- a/packages/agents-runtime/src/process-wake.ts +++ b/packages/agents-runtime/src/process-wake.ts @@ -591,6 +591,17 @@ export async function processWake( let idleTimer: ReturnType | null = null let idleController: AbortController | null = null let runAbortController: AbortController | null = null + // Turn-end background compaction: a detached summarize, applied as a + // checkpoint at the next turn's start — or immediately if it settles while + // idle. Persists across loop iterations. + let pendingBackgroundCompaction: { + watermark: number + status: `pending` | `ready` | `failed` + summary: string + } | null = null + // Set when the detached summarize settles while idle; tells the idle loop to + // apply the checkpoint without running the handler. + let backgroundApplyRequestedDuringIdle = false let activeSignalHandler: | (( signal: Pick @@ -2058,6 +2069,16 @@ export async function processWake( }) armIdleTimer() queueNextWakeIfReady() + // A background compaction may have settled in the gap between the + // post-handler start and entering idle; wake immediately to apply it. + if ( + queuedNextWake === null && + pendingBackgroundCompaction && + pendingBackgroundCompaction.status !== `pending` + ) { + backgroundApplyRequestedDuringIdle = true + idleController.abort() + } await idleWait acceptLiveInputs = false @@ -2162,7 +2183,13 @@ export async function processWake( pendingRunAbortRequested = false runAbortController.abort() } - const { ctx: handlerCtx, getSleepRequested } = createHandlerContext({ + const { + ctx: handlerCtx, + getSleepRequested, + maybeStartBackgroundCompaction, + writeBackgroundCheckpoint, + failBackgroundCheckpoint, + } = createHandlerContext({ entityUrl, entityType: typeName, epoch, @@ -2241,10 +2268,65 @@ export async function processWake( `payloadType=${typeof currentWakeEvent.payload} ` + `firstWake=${initialFirstWake && !setupComplete}` ) + // Apply a ready (or failed) background compaction from a prior turn + // *before* the handler reconstructs its context, so this turn starts + // compacted. Always inside a handler invocation → normal write/flush. + if ( + pendingBackgroundCompaction && + pendingBackgroundCompaction.status !== `pending` + ) { + if (pendingBackgroundCompaction.status === `ready`) { + writeBackgroundCheckpoint( + pendingBackgroundCompaction.watermark, + pendingBackgroundCompaction.summary + ) + } else { + failBackgroundCheckpoint(pendingBackgroundCompaction.watermark) + } + pendingBackgroundCompaction = null + } + const handlerT0 = performance.now() await definition.handler(handlerCtx, currentWakeEvent) handlerMs += +(performance.now() - handlerT0).toFixed(2) log.info(`handler returned`) + + // Kick off a turn-end background compaction if usage is high and none is + // already pending. Detached: the summarize runs off the critical path + // and its checkpoint is written at the next turn's start (above). + if (!pendingBackgroundCompaction) { + const started = maybeStartBackgroundCompaction() + if (started) { + const slot: { + watermark: number + status: `pending` | `ready` | `failed` + summary: string + } = { watermark: started.watermark, status: `pending`, summary: `` } + pendingBackgroundCompaction = slot + const onSettled = (): void => { + // If the summarize finished while we're idle (no turn to apply it + // at), wake the idle loop so the checkpoint is written now and the + // indicator clears. Otherwise the next turn's pre-handler applies. + if (acceptLiveInputs && pendingBackgroundCompaction === slot) { + backgroundApplyRequestedDuringIdle = true + idleController?.abort() + } + } + started.promise + .then((summary) => { + slot.status = `ready` + slot.summary = summary + onSettled() + }) + .catch((err) => { + slot.status = `failed` + log.warn( + `background compaction failed: ${toError(err).message}` + ) + onSettled() + }) + } + } await waitForSignalHandlers() activeSignalHandler = null await waitForSharedStateWiring() @@ -2377,10 +2459,43 @@ export async function processWake( break } - const resumed = await awaitIdleForFreshWork( - `handler returned, entering idle (${idleTimeout / 1000}s timeout)` - ) - if (resumed) { + // Idle until a fresh wake resumes us or we genuinely go idle — re-entering + // each time a detached background compaction settles while idle, so its + // checkpoint is written (without an agent run) and the indicator doesn't + // linger. + let resumedFromIdle = false + let idleReason = `handler returned` + let appliedBackgroundDuringIdle = false + do { + appliedBackgroundDuringIdle = false + const resumed = await awaitIdleForFreshWork( + `${idleReason}, entering idle (${idleTimeout / 1000}s timeout)` + ) + if (resumed) { + resumedFromIdle = true + break + } + const settled = backgroundApplyRequestedDuringIdle + ? pendingBackgroundCompaction + : null + backgroundApplyRequestedDuringIdle = false + if (settled && settled.status !== `pending`) { + if (settled.status === `ready`) { + writeBackgroundCheckpoint(settled.watermark, settled.summary) + log.info(`background compaction applied during idle`) + } else { + failBackgroundCheckpoint(settled.watermark) + log.info(`background compaction failed; cleared during idle`) + } + pendingBackgroundCompaction = null + await drainAllPendingWrites() + await wakeSession.commitManifestEntries() + await flushProducedWrites() + idleReason = `background compaction applied` + appliedBackgroundDuringIdle = true + } + } while (appliedBackgroundDuringIdle) + if (resumedFromIdle) { continue } break diff --git a/packages/agents-runtime/src/timeline-context.ts b/packages/agents-runtime/src/timeline-context.ts index 461430da4a..8c5498f720 100644 --- a/packages/agents-runtime/src/timeline-context.ts +++ b/packages/agents-runtime/src/timeline-context.ts @@ -22,6 +22,10 @@ import type { TimestampedMessage, } from './types' import { COMPOSER_INPUT_MESSAGE_TYPE } from './composer-input' +import { + isCompactionCheckpointAttrs, + isCompleteCompactionCheckpointAttrs, +} from './compaction' function asString(value: unknown, fallback = ``): string { if (typeof value === `string`) { @@ -468,10 +472,44 @@ export function timelineMessages( const attachmentsByInboxKey = attachmentsBySubjectInboxKey(db) const messages: Array = [] + // A completed compaction checkpoint summarizes everything up to its watermark. + // Items below the watermark are dropped; the summary is rendered AT the + // watermark — so it precedes any messages that arrived during a *background* + // compaction (physically written before the checkpoint row, but logically + // after the watermark). The watermark is the checkpoint's stored + // `attrs.watermark` (captured when background compaction snapshotted the + // head), falling back to the row's own order for synchronous compaction. Only + // `complete` checkpoints count, so a `running`/crashed one never hides history. + const activeCheckpoint = latestCompleteCompactionCheckpoint(items) + const compactionWatermark = activeCheckpoint + ? compactionWatermarkOf(activeCheckpoint) + : Number.NEGATIVE_INFINITY + let summaryEmitted = false + const emitSummary = (): void => { + if (!activeCheckpoint || summaryEmitted) return + for (const message of projection(activeCheckpoint) ?? []) { + messages.push({ ...message, at: compactionWatermark }) + } + summaryEmitted = true + } + for (const item of items) { if (item.at < since) { continue } + if (item.at <= compactionWatermark) { + continue // folded into the summary + } + // Place the summary at the watermark boundary, before the first kept item. + emitSummary() + // All compaction checkpoint rows are UI-only here (the active summary is + // emitted above); skip them from the model context. + if ( + item.kind === `context_inserted` && + isCompactionCheckpointAttrs(item.attrs) + ) { + continue + } for (const message of projection(item) ?? []) { messages.push({ @@ -480,10 +518,52 @@ export function timelineMessages( }) } } + // A checkpoint with no kept items after it (e.g. a `since` filter past the + // head) still surfaces its summary. + emitSummary() return messages } +/** + * The newest live *completed* compaction checkpoint item, or null. `running`/ + * `failed` checkpoints are ignored so an in-flight or crashed compaction never + * hides history. + */ +function latestCompleteCompactionCheckpoint( + items: ReadonlyArray +): Extract | null { + let latest: Extract | null = null + for (const item of items) { + if ( + item.kind === `context_inserted` && + !item.superseded && + isCompleteCompactionCheckpointAttrs(item.attrs) && + (latest === null || item.at > latest.at) + ) { + latest = item + } + } + return latest +} + +/** + * Logical watermark for a compaction checkpoint: the stored `attrs.watermark` + * (set by background compaction at snapshot time) if valid, else the row's own + * order (synchronous compaction writes at the point it summarizes). + */ +function compactionWatermarkOf( + checkpoint: Extract +): number { + const stored = checkpoint.attrs?.watermark + if (typeof stored === `number` && Number.isFinite(stored)) return stored + if (typeof stored === `string` && stored.length > 0) { + const parsed = Number(stored) + if (Number.isFinite(parsed)) return parsed + } + return checkpoint.at +} + export function timelineToMessages(db: EntityStreamDB): Array { return timelineMessages(db).map( ({ at: _at, ...message }) => message as LLMMessage diff --git a/packages/agents-runtime/src/token-accountant.ts b/packages/agents-runtime/src/token-accountant.ts new file mode 100644 index 0000000000..bd0b268970 --- /dev/null +++ b/packages/agents-runtime/src/token-accountant.ts @@ -0,0 +1,210 @@ +/** + * Token accounting for context-window usage — the single source of truth for + * turning a step's reported tokens into a "% of the context window used" figure. + * The runtime triggers and the UI gauge both compute through here, so the number + * the user sees is the number compaction acts on. + */ + +import { approxTokens, formatTokenCount } from './token-budget' +import type { LLMMessage } from './types' + +/** + * Fractions of the context window at which compaction changes behaviour, kept + * here so the UI gauge and the runtime triggers share one set of numbers. + * + * - `AWARENESS` (25/50/75%): inject a budget notice so the model can pace itself. + * - `BACKGROUND_START` (85%): kick off background compaction. + * - `HARD_CEILING` (90%): compact synchronously before the next model call. + * Matches Codex's auto-compaction threshold. + */ +export const CONTEXT_USAGE_AWARENESS_THRESHOLDS = [0.25, 0.5, 0.75] as const +export const CONTEXT_USAGE_BACKGROUND_START = 0.85 +export const CONTEXT_USAGE_HARD_CEILING = 0.9 + +export interface ContextUsageInput { + /** + * Cache-inclusive prompt size of the most recent request — every token the + * request occupied in the context window (`input + cacheRead + cacheWrite`). + * Persisted as `context_input_tokens` on the step. + */ + contextInputTokens: number + /** + * Output tokens of that same step. They re-enter the prompt on the next turn, + * so counting them makes the gauge reflect how full the window will be going + * into the next request rather than lagging a turn behind. + */ + outputTokens?: number + /** The model's context window (`context_window` on the step). */ + contextWindow: number +} + +export interface ContextUsage { + /** Estimated tokens occupying the window now (input + output of last step). */ + usedTokens: number + contextWindow: number + /** `usedTokens / contextWindow`, clamped to [0, 1]. */ + ratio: number +} + +/** + * Compute context-window usage from a step's reported tokens. Returns `null` + * when the context window is unknown or non-positive (e.g. a provider that + * didn't report it), so callers can hide the gauge rather than divide by zero. + */ +export function computeContextUsage( + input: ContextUsageInput +): ContextUsage | null { + if (!Number.isFinite(input.contextWindow) || input.contextWindow <= 0) { + return null + } + const usedTokens = Math.max( + 0, + input.contextInputTokens + (input.outputTokens ?? 0) + ) + const ratio = Math.min(1, usedTokens / input.contextWindow) + return { usedTokens, contextWindow: input.contextWindow, ratio } +} + +export type ContextUsageLevel = `normal` | `warning` | `critical` + +/** + * Severity bucket for a usage ratio, aligned to the compaction thresholds: + * `warning` once background compaction would start (85%), `critical` at the + * hard ceiling (90%). + */ +export function contextUsageLevel(ratio: number): ContextUsageLevel { + if (ratio >= CONTEXT_USAGE_HARD_CEILING) return `critical` + if (ratio >= CONTEXT_USAGE_BACKGROUND_START) return `warning` + return `normal` +} + +/** Render a usage ratio as a whole-percent label, e.g. `42%`. */ +export function formatContextUsagePercent(ratio: number): string { + return `${Math.round(ratio * 100)}%` +} + +/** + * Minimal shape of a persisted step row needed to derive context usage — the + * cache-inclusive prompt size, the step's output, the model window, and `_seq` + * (the collection's monotonic insertion order) to find the most recent one. + */ +export interface ContextUsageStep { + _seq?: number + context_input_tokens?: number + context_window?: number + output_tokens?: number +} + +/** + * Pick the most recent step that reported context usage and compute its usage. + * The latest step of the latest run carries the whole conversation, so its + * cache-inclusive prompt size is the best estimate of current fullness. Returns + * `null` when no step has reported usage yet (e.g. the very first turn). + */ +export function selectLatestContextUsage( + steps: ReadonlyArray +): ContextUsage | null { + let latest: ContextUsageStep | null = null + for (const step of steps) { + if ( + typeof step.context_window !== `number` || + step.context_window <= 0 || + typeof step.context_input_tokens !== `number` + ) { + continue + } + if (!latest || (step._seq ?? 0) > (latest._seq ?? 0)) { + latest = step + } + } + if (!latest) return null + return computeContextUsage({ + contextInputTokens: latest.context_input_tokens as number, + outputTokens: latest.output_tokens, + contextWindow: latest.context_window as number, + }) +} + +/** + * Whether to show the model a budget notice — once usage reaches the first + * awareness threshold (25%). Below that the window is empty enough that a + * reminder is just noise. + */ +export function shouldSurfaceContextBudget(ratio: number): boolean { + return ratio >= CONTEXT_USAGE_AWARENESS_THRESHOLDS[0] +} + +/** + * The human-readable body of the model-facing budget notice — remaining tokens + * plus the percentage left. Recomputed every turn from the latest step, so it + * is always current rather than a stale snapshot from when a threshold was + * first crossed. + */ +export function formatContextBudgetNotice(usage: ContextUsage): string { + const remaining = Math.max(0, usage.contextWindow - usage.usedTokens) + const percentLeft = Math.max(0, Math.round((1 - usage.ratio) * 100)) + return `You have about ${formatTokenCount( + remaining + )} tokens (${percentLeft}%) of the context window remaining.` +} + +/** Tag wrapping the budget notice, mirroring Codex's ``. */ +const CONTEXT_BUDGET_NOTICE_TAG = `token_budget` + +/** The model-facing budget notice as a (user-role) message. */ +export function buildContextBudgetNotice(usage: ContextUsage): LLMMessage { + return { + role: `user`, + content: `<${CONTEXT_BUDGET_NOTICE_TAG}>\n${formatContextBudgetNotice( + usage + )}\n`, + } +} + +/** + * Return `messages` with a current context-budget notice injected, or unchanged + * when usage is unknown or below the first awareness threshold. The notice is + * placed just before the final message so the closing turn (and any + * last-message inspection downstream) is preserved. + */ +export function withContextBudgetNotice( + messages: ReadonlyArray, + usage: ContextUsage | null +): Array { + if (!usage || !shouldSurfaceContextBudget(usage.ratio)) { + return [...messages] + } + const notice = buildContextBudgetNotice(usage) + if (messages.length === 0) return [notice] + return [...messages.slice(0, -1), notice, messages[messages.length - 1]!] +} + +/** + * Default cap on a single tool result's size before it is truncated. One giant + * tool output (a huge file read, a verbose command) can fill the window on its + * own; capping each result keeps any single one bounded. Mirrors Codex's + * per-message truncation. + */ +export const CONTEXT_TOOL_OUTPUT_MAX_TOKENS = 10_000 + +/** + * Replace any single `tool_result` whose content exceeds `maxTokens` with a + * visible placeholder. Truncation is explicit (never silent) and leaves the + * tool-call pairing intact (`toolCallId` / `isError` are preserved). Other + * message roles pass through untouched. + */ +export function truncateOversizedToolResults( + messages: ReadonlyArray, + maxTokens: number = CONTEXT_TOOL_OUTPUT_MAX_TOKENS +): Array { + return messages.map((message) => { + if (message.role !== `tool_result`) return message + if (approxTokens(message.content) <= maxTokens) return message + return { + ...message, + content: `[Output truncated: exceeded ${formatTokenCount( + maxTokens + )} tokens and was removed to fit the context window]`, + } + }) +} diff --git a/packages/agents-runtime/src/types.ts b/packages/agents-runtime/src/types.ts index 7e4759ff8e..8cee5d4722 100644 --- a/packages/agents-runtime/src/types.ts +++ b/packages/agents-runtime/src/types.ts @@ -976,8 +976,28 @@ export interface AgentConfig { modelTimeoutMs?: number modelMaxRetries?: number testResponses?: TestResponses + // Model-call seam for context-compaction summarization. Defaults to pi-ai's + // completeSimple (i.e. the conversation model). Injected by tests, and a hook + // for routing summarization to a different model later. + summarizeComplete?: SummarizeCompleteFn } +/** + * Model-call seam for compaction summarization. Typed loosely (only the text + * content is needed back) so it matches pi-ai's `completeSimple` without + * importing its types here. Defined in types.ts to keep `compaction-summarize` + * → `types` a one-way dependency. + */ +export type SummarizeCompleteFn = ( + model: unknown, + context: { messages: Array; systemPrompt?: string }, + options?: Record +) => Promise<{ + content: Array<{ type: string; text?: string }> + stopReason?: string + errorMessage?: string +}> + export type TestResponses = Array | TestResponseFn export type TestResponseFn = ( diff --git a/packages/agents-runtime/test/compaction-background.test.ts b/packages/agents-runtime/test/compaction-background.test.ts new file mode 100644 index 0000000000..733235bf70 --- /dev/null +++ b/packages/agents-runtime/test/compaction-background.test.ts @@ -0,0 +1,134 @@ +import { describe, expect, it } from 'vitest' +import { + buildStreamFixture, + createTestHandlerContext, +} from './helpers/context-test-helpers' +import type { ChangeEvent } from '@durable-streams/state' + +function seedStep( + db: ReturnType, + contextInputTokens: number +): void { + ;( + db.collections as unknown as { steps: { insert: (r: unknown) => void } } + ).steps.insert({ + key: `step-1`, + _seq: 1, + run_id: `r`, + step_number: 1, + status: `completed`, + context_input_tokens: contextInputTokens, + context_window: 200000, + }) +} + +function compactionAttrs(event: ChangeEvent): + | { + kind?: string + status?: string + background?: boolean + watermark?: unknown + } + | undefined { + return (event.value as { attrs?: { kind?: string } } | undefined) + ?.attrs as never +} + +describe(`background compaction (turn-end)`, () => { + it(`starts when usage is high: summarizes + writes a background running checkpoint`, async () => { + const db = buildStreamFixture( + Array.from({ length: 6 }, (_, i) => ({ + kind: `inbox` as const, + at: i + 1, + value: { payload: `MSG_${i}` }, + })) + ) + seedStep(db, 190000) // 95% of the 200k window ≥ 85% background start + + const writes: Array = [] + const res = createTestHandlerContext({ + db, + writeEvent: (event) => { + writes.push(event) + db.utils.applyEvent(event) + }, + }) + res.ctx.useAgent({ + systemPrompt: `t`, + model: `claude-sonnet-4-5-20250929`, + provider: `anthropic`, + tools: [], + summarizeComplete: async () => ({ + content: [{ type: `text`, text: `BG_SUMMARY` }], + }), + }) + + const handle = res.maybeStartBackgroundCompaction() + expect(handle).not.toBeNull() + + const summary = await handle!.promise + expect(summary).toContain(`BG_SUMMARY`) + + // A background-flavored running checkpoint was written for the UI. + const idOf = (e: ChangeEvent): string | undefined => + (e.value as { id?: string } | undefined)?.id + const runningWrite = writes.find((e) => { + const a = compactionAttrs(e) + return a?.status === `running` && a?.background === true + }) + expect(runningWrite).toBeDefined() + // …under a watermark-unique id, so the NEXT generation's `running` can't + // supersede this generation's `complete`. + expect(idOf(runningWrite!)).toBe(`compaction-bg-${handle!.watermark}`) + + // Applying the result writes a complete checkpoint carrying the watermark, + // under the SAME generation id (so it supersedes its own running row). + res.writeBackgroundCheckpoint(handle!.watermark, summary) + const completeWrite = writes.find( + (e) => compactionAttrs(e)?.status === `complete` + ) + expect(completeWrite).toBeDefined() + expect(compactionAttrs(completeWrite!)?.watermark).toBe(handle!.watermark) + expect(idOf(completeWrite!)).toBe(`compaction-bg-${handle!.watermark}`) + }) + + it(`skips when usage is below the background threshold`, () => { + const db = buildStreamFixture([ + { kind: `inbox`, at: 1, value: { payload: `hi` } }, + ]) + seedStep(db, 50000) // 25% + const res = createTestHandlerContext({ db }) + res.ctx.useAgent({ + systemPrompt: `t`, + model: `claude-sonnet-4-5-20250929`, + provider: `anthropic`, + tools: [], + }) + expect(res.maybeStartBackgroundCompaction()).toBeNull() + }) + + it(`skips when already compacted up to the head`, () => { + const db = buildStreamFixture([ + { kind: `inbox`, at: 1, value: { payload: `hi` } }, + { + kind: `context_inserted`, + at: 2, + value: { + id: `compaction`, + name: `compaction_summary`, + attrs: { kind: `compaction`, status: `complete`, watermark: 99999 }, + content: `S`, + }, + }, + ]) + seedStep(db, 190000) + const res = createTestHandlerContext({ db }) + res.ctx.useAgent({ + systemPrompt: `t`, + model: `claude-sonnet-4-5-20250929`, + provider: `anthropic`, + tools: [], + }) + expect(res.maybeStartBackgroundCompaction()).toBeNull() + }) +}) diff --git a/packages/agents-runtime/test/compaction-midturn.test.ts b/packages/agents-runtime/test/compaction-midturn.test.ts new file mode 100644 index 0000000000..93ca06c97a --- /dev/null +++ b/packages/agents-runtime/test/compaction-midturn.test.ts @@ -0,0 +1,122 @@ +import { describe, expect, it, vi } from 'vitest' +import { createMidTurnCompactor } from '../src/compaction-midturn' +import type { AgentMessageLike } from '../src/compaction-midturn' + +function msgs(n: number): Array { + return Array.from({ length: n }, (_, i) => ({ + role: i % 2 ? `assistant` : `user`, + content: `m${i}`, + })) +} + +describe(`createMidTurnCompactor`, () => { + it(`leaves context untouched below the ceiling`, async () => { + const summarize = vi.fn() + const writeCheckpoint = vi.fn() + const compact = createMidTurnCompactor({ + summarize, + writeCheckpoint, + ceiling: 0.9, + }) + const out = await compact({ + messages: msgs(10), + currentTokens: 1000, // 10% of window + contextWindow: 10000, + }) + expect(out).toBeNull() + expect(summarize).not.toHaveBeenCalled() + }) + + it(`compacts over the ceiling: summarizes the WHOLE context, continues from [summary]`, async () => { + const summarize = vi.fn().mockResolvedValue(`SUMMARY`) + const statuses: Array = [] + const writeCheckpoint = vi.fn((s: string) => statuses.push(s)) + const compact = createMidTurnCompactor({ + summarize, + writeCheckpoint, + ceiling: 0.9, + }) + const messages = msgs(10) + const out = await compact({ + messages, + currentTokens: 9500, // 95% + contextWindow: 10000, + }) + // The entire context is folded (no verbatim tail kept), so the summary and + // the checkpoint's timeline-head watermark agree. + expect(summarize).toHaveBeenCalledTimes(1) + expect((summarize.mock.calls[0]![0] as Array).length).toBe(10) + expect(statuses).toEqual([`running`, `complete`]) + expect(out).not.toBeNull() + expect(out!.length).toBe(1) // summary only + expect(JSON.stringify(out![0])).toContain(`SUMMARY`) + }) + + it(`is sticky: reuses the cached summary and appends messages added since`, async () => { + const summarize = vi.fn().mockResolvedValue(`SUMMARY`) + const compact = createMidTurnCompactor({ + summarize, + writeCheckpoint: vi.fn(), + ceiling: 0.9, + }) + const messages = msgs(10) + await compact({ messages, currentTokens: 9500, contextWindow: 10000 }) + summarize.mockClear() + const grown = [...messages, { role: `user`, content: `new` }] + const out = await compact({ + messages: grown, + currentTokens: 2000, // 20%, well under ceiling + contextWindow: 10000, + }) + expect(summarize).not.toHaveBeenCalled() // reused, not re-summarized + expect(JSON.stringify(out![0])).toContain(`SUMMARY`) + expect(out!.length).toBe(1 + 1) // summary + the one message added since + expect(out![1]).toBe(grown[10]) + }) + + it(`re-summarizes by chaining off the previous summary`, async () => { + const summarize = vi + .fn() + .mockResolvedValueOnce(`SUMMARY1`) + .mockResolvedValueOnce(`SUMMARY2`) + const compact = createMidTurnCompactor({ + summarize, + writeCheckpoint: vi.fn(), + ceiling: 0.9, + }) + await compact({ + messages: msgs(10), + currentTokens: 9500, + contextWindow: 10000, + }) + const out = await compact({ + messages: msgs(20), + currentTokens: 9500, + contextWindow: 10000, + }) + expect(summarize).toHaveBeenCalledTimes(2) + // The re-summarization input is [prior summary, ...the messages added since] + // (chained, not the whole already-summarized bulk re-read). + const secondInput = summarize.mock.calls[1]![0] as Array + expect(secondInput.length).toBe(1 + 10) // SUMMARY1 + msgs 10..19 + expect(JSON.stringify(secondInput[0])).toContain(`SUMMARY1`) + expect(JSON.stringify(out![0])).toContain(`SUMMARY2`) + }) + + it(`on failure writes "failed" and leaves context untouched`, async () => { + const summarize = vi.fn().mockRejectedValue(new Error(`boom`)) + const statuses: Array = [] + const compact = createMidTurnCompactor({ + summarize, + writeCheckpoint: (s: string) => statuses.push(s), + ceiling: 0.9, + }) + const out = await compact({ + messages: msgs(10), + currentTokens: 9500, + contextWindow: 10000, + }) + expect(statuses).toEqual([`running`, `failed`]) + expect(out).toBeNull() // no prior compaction → untouched + }) +}) diff --git a/packages/agents-runtime/test/compaction-summarize.test.ts b/packages/agents-runtime/test/compaction-summarize.test.ts new file mode 100644 index 0000000000..6928de71c6 --- /dev/null +++ b/packages/agents-runtime/test/compaction-summarize.test.ts @@ -0,0 +1,83 @@ +import { describe, expect, it } from 'vitest' +import { summarizeMessages } from '../src/compaction-summarize' +import type { SummarizeCompleteFn } from '../src/compaction-summarize' +import type { LLMMessage } from '../src/types' + +const messages: Array = [ + { role: `user`, content: `please build a thing` }, + { role: `assistant`, content: `working on it` }, +] + +describe(`summarizeMessages`, () => { + it(`sends history + the summarization prompt and prefixes the result`, async () => { + let sawPrompt = false + let sawHistory = false + const complete: SummarizeCompleteFn = async (_model, context) => { + // History content is normalized into text blocks, so search the + // serialized messages rather than assuming a string content. + const serialized = JSON.stringify(context.messages) + sawHistory = serialized.includes(`please build a thing`) + sawPrompt = serialized.includes(`CONTEXT CHECKPOINT COMPACTION`) + // the prompt must be the LAST message + expect(JSON.stringify(context.messages.at(-1))).toContain( + `CONTEXT CHECKPOINT COMPACTION` + ) + return { content: [{ type: `text`, text: `SUMMARY_BODY` }] } + } + + const out = await summarizeMessages({ + model: `claude-sonnet-4-5-20250929`, + messages, + complete, + }) + + expect(sawHistory).toBe(true) + expect(sawPrompt).toBe(true) + // Codex summary preamble is prepended… + expect(out).toContain( + `Another language model started to solve this problem` + ) + // …followed by the model's summary body. + expect(out).toContain(`SUMMARY_BODY`) + }) + + it(`rejects (does not hang) when the model call stalls past the timeout`, async () => { + // A stalled stream that never resolves — the real failure mode that wedged + // background compaction. The timeout must turn it into a rejection. + let aborted = false + const complete: SummarizeCompleteFn = (_model, _context, options) => + // Never settles on its own — models a stalled stream that ignores abort, + // so only the hard timer can break the wait. + new Promise(() => { + const signal = (options as { signal?: AbortSignal } | undefined)?.signal + signal?.addEventListener(`abort`, () => { + aborted = true + }) + }) + + await expect( + summarizeMessages({ + model: `claude-sonnet-4-5-20250929`, + messages, + complete, + timeoutMs: 20, + }) + ).rejects.toThrow(/timed out after 20ms/) + // The caller is also signalled to abort so the underlying fetch can unwind. + expect(aborted).toBe(true) + }) + + it(`throws when the model returns an empty summary`, async () => { + const complete: SummarizeCompleteFn = async () => ({ + content: [{ type: `text`, text: ` ` }], + stopReason: `stop`, + }) + await expect( + summarizeMessages({ + model: `claude-sonnet-4-5-20250929`, + messages, + complete, + }) + ).rejects.toThrow(/empty summary/) + }) +}) diff --git a/packages/agents-runtime/test/compaction-trigger.test.ts b/packages/agents-runtime/test/compaction-trigger.test.ts new file mode 100644 index 0000000000..db0127d992 --- /dev/null +++ b/packages/agents-runtime/test/compaction-trigger.test.ts @@ -0,0 +1,144 @@ +import { afterEach, describe, expect, it } from 'vitest' +import { createAssistantMessageEventStream } from '@mariozechner/pi-ai' +import { + buildStreamFixture, + createTestHandlerContext, +} from './helpers/context-test-helpers' +import type { ChangeEvent } from '@durable-streams/state' + +// The mid-turn compactor gates on (real last-step usage + trailing estimate) vs +// the MODEL's context window. We can't shrink the real window in a unit test, so +// we drive the trigger with the env override + a high seeded anchor instead. +const savedEnv = { + ceiling: process.env.ELECTRIC_AGENTS_COMPACT_CEILING, +} +afterEach(() => { + process.env.ELECTRIC_AGENTS_COMPACT_CEILING = savedEnv.ceiling +}) + +function completedAssistantMessage(): unknown { + return { + role: `assistant`, + content: [{ type: `text`, text: `ok` }], + api: `anthropic-messages`, + provider: `anthropic`, + model: `claude-sonnet-4-5-20250929`, + usage: { + input: 1, + output: 1, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 2, + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, + }, + stopReason: `stop`, + timestamp: Date.now(), + } +} + +function seedStep( + db: ReturnType, + contextInputTokens: number +): void { + ;( + db.collections as unknown as { steps: { insert: (r: unknown) => void } } + ).steps.insert({ + key: `step-1`, + _seq: 1, + run_id: `r`, + step_number: 1, + status: `completed`, + context_input_tokens: contextInputTokens, + context_window: 200000, + }) +} + +describe(`mid-turn compaction trigger`, () => { + it(`compacts mid-turn: summarizer runs, model sees the summary, checkpoint persisted`, async () => { + // Ceiling tiny so any real model window is crossed by the seeded anchor. + process.env.ELECTRIC_AGENTS_COMPACT_CEILING = `0.0001` + + // Enough messages that there's real content beyond the kept tail (6). + const db = buildStreamFixture( + Array.from({ length: 8 }, (_, i) => ({ + kind: `inbox` as const, + at: i + 1, + value: { payload: `MESSAGE_${i}` }, + })) + ) + seedStep(db, 50_000) // high anchor → over the tiny ceiling + + const writes: Array = [] + const { ctx } = createTestHandlerContext({ + db, + writeEvent: (event: ChangeEvent) => { + writes.push(event) + db.utils.applyEvent(event) + }, + }) + + let captured = `` + let summarizeCalled = false + ctx.useAgent({ + systemPrompt: `test`, + model: `claude-sonnet-4-5-20250929`, + tools: [], + summarizeComplete: async () => { + summarizeCalled = true + return { content: [{ type: `text`, text: `COMPACTED_SUMMARY` }] } + }, + streamFn: ((_model: unknown, context: unknown) => { + captured = JSON.stringify(context) + const stream = createAssistantMessageEventStream() + queueMicrotask(() => stream.end(completedAssistantMessage() as never)) + return stream + }) as never, + }) + + await ctx.agent.run(`continue`) + + expect(summarizeCalled).toBe(true) + expect(captured).toContain(`COMPACTED_SUMMARY`) + const statuses = writes + .map( + (event) => + ( + event.value as + | { attrs?: { kind?: string; status?: string } } + | undefined + )?.attrs + ) + .filter((attrs) => attrs?.kind === `compaction`) + .map((attrs) => attrs?.status) + expect(statuses).toContain(`running`) + expect(statuses).toContain(`complete`) + }) + + it(`does not compact when well under the ceiling`, async () => { + // Default ceiling (0.9); a small anchor against the real ~200k+ window. + const db = buildStreamFixture([ + { kind: `inbox`, at: 1, value: { payload: `hello` } }, + ]) + seedStep(db, 500) + + let summarizeCalled = false + const { ctx } = createTestHandlerContext({ db }) + ctx.useAgent({ + systemPrompt: `test`, + model: `claude-sonnet-4-5-20250929`, + tools: [], + summarizeComplete: async () => { + summarizeCalled = true + return { content: [{ type: `text`, text: `X` }] } + }, + streamFn: ((_model: unknown) => { + const stream = createAssistantMessageEventStream() + queueMicrotask(() => stream.end(completedAssistantMessage() as never)) + return stream + }) as never, + }) + + await ctx.agent.run(`continue`) + expect(summarizeCalled).toBe(false) + }) +}) diff --git a/packages/agents-runtime/test/context-factory.test.ts b/packages/agents-runtime/test/context-factory.test.ts index 9749dcd22b..e8e4f7a4b8 100644 --- a/packages/agents-runtime/test/context-factory.test.ts +++ b/packages/agents-runtime/test/context-factory.test.ts @@ -1,4 +1,5 @@ import { describe, expect, it, vi } from 'vitest' +import { createAssistantMessageEventStream } from '@mariozechner/pi-ai' import { getCronStreamPath } from '../src/cron-utils' import { createHandlerContext } from '../src/context-factory' import { ENTITY_COLLECTIONS } from '../src/entity-schema' @@ -495,4 +496,148 @@ describe(`createHandlerContext`, () => { ], }) }) + + // === Phase 1: budget notice end-to-end through the real runAgent path === + // Drives an actual agent run with a capturing streamFn and asserts the + // `` notice does (or doesn't) reach the model's context, + // gated on a seeded step's usage. This is the "prompt a model and check + // the notice is inserted" verification, made deterministic. + + function seedStep( + db: EntityStreamDBWithActions, + step: { + _seq: number + context_input_tokens: number + context_window: number + output_tokens?: number + } + ): void { + ;( + db.collections as unknown as { steps: { insert: (r: unknown) => void } } + ).steps.insert({ + key: `step-${step._seq}`, + run_id: `run-1`, + step_number: 1, + status: `completed`, + ...step, + }) + } + + function completedAssistantMessage(): unknown { + return { + role: `assistant`, + content: [{ type: `text`, text: `ok` }], + api: `anthropic-messages`, + provider: `anthropic`, + model: `claude-sonnet-4-5-20250929`, + usage: { + input: 10, + output: 5, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 15, + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, + }, + stopReason: `stop`, + timestamp: Date.now(), + } + } + + async function captureModelContext( + db: EntityStreamDBWithActions + ): Promise { + const { ctx } = createHandlerContext({ + entityUrl: `/chat/test`, + entityType: `chat`, + epoch: 1, + wakeOffset: `-1`, + firstWake: false, + args: {}, + db, + sandbox: testSandboxStub, + state: {}, + actions: {}, + electricTools: [], + events: [] as Array, + writeEvent: vi.fn(), + wakeSession: { + getPhase: () => `active`, + registerManifestEntry: vi.fn(() => true), + removeManifestEntry: vi.fn(() => false), + commitManifestEntries: vi.fn(), + rollbackManifestEntries: vi.fn(), + registerSharedStateHandle: vi.fn(), + registerSpawnHandle: vi.fn(), + registerSourceHandle: vi.fn(), + enqueueSend: vi.fn(), + getManifest: vi.fn(() => []), + getPendingSends: vi.fn(() => []), + getSharedStateHandles: vi.fn(() => new Map()), + getSpawnHandles: vi.fn(() => new Map()), + getSourceHandles: vi.fn(() => new Map()), + finishSetup: vi.fn(() => ({ + manifest: [], + sharedStateHandles: new Map(), + spawnHandles: new Map(), + sourceHandles: new Map(), + })), + close: vi.fn(), + } as any, + wakeEvent: { + type: `inbox`, + payload: `hello`, + fromOffset: 0, + toOffset: 0, + eventCount: 1, + } as any, + doObserve: vi.fn(), + doSpawn: vi.fn(), + doFork: vi.fn(), + doMkdb: vi.fn(), + executeSend: vi.fn(), + tags: {}, + doSetTag: vi.fn().mockResolvedValue(undefined), + doDeleteTag: vi.fn().mockResolvedValue(undefined), + doUnobserve: vi.fn().mockResolvedValue(undefined), + }) + + let captured: unknown = null + ctx.useAgent({ + systemPrompt: `test`, + model: `claude-sonnet-4-5-20250929`, + tools: [], + streamFn: ((_model: unknown, context: unknown) => { + captured = context + const stream = createAssistantMessageEventStream() + queueMicrotask(() => stream.end(completedAssistantMessage() as never)) + return stream + }) as never, + }) + + await ctx.agent.run(`hello`) + return JSON.stringify(captured) + } + + it(`injects the token budget notice into the model context at >=25% usage`, async () => { + const db = createMockDb([]) + seedStep(db, { + _seq: 1, + context_input_tokens: 80_000, + context_window: 100_000, + }) + const contextJson = await captureModelContext(db) + expect(contextJson).toContain(`token_budget`) + expect(contextJson).toContain(`remaining`) + }) + + it(`does not inject the notice below 25% usage`, async () => { + const db = createMockDb([]) + seedStep(db, { + _seq: 1, + context_input_tokens: 10_000, + context_window: 100_000, + }) + const contextJson = await captureModelContext(db) + expect(contextJson).not.toContain(`token_budget`) + }) }) diff --git a/packages/agents-runtime/test/outbound-bridge.test.ts b/packages/agents-runtime/test/outbound-bridge.test.ts index 62fb5d75b3..a620869475 100644 --- a/packages/agents-runtime/test/outbound-bridge.test.ts +++ b/packages/agents-runtime/test/outbound-bridge.test.ts @@ -320,6 +320,44 @@ describe(`createOutboundBridge`, () => { expect(value.output_tokens).toBe(567) }) + it(`onStepEnd surfaces context usage and window for the gauge`, () => { + const writes: Array = [] + const bridge = createOutboundBridge([], (e) => { + writes.push(e) + }) + + bridge.onRunStart() + bridge.onStepStart({ modelId: `gpt-4` }) + bridge.onStepEnd({ + finishReason: `stop`, + tokenInput: 1234, + tokenOutput: 567, + // Cache-inclusive prompt size — larger than `tokenInput`, which + // excludes cache reads. + tokenContext: 90_000, + contextWindow: 128_000, + }) + + const value = writes[writes.length - 1]!.value as Record + expect(value.context_input_tokens).toBe(90_000) + expect(value.context_window).toBe(128_000) + }) + + it(`onStepEnd omits context columns when not reported`, () => { + const writes: Array = [] + const bridge = createOutboundBridge([], (e) => { + writes.push(e) + }) + + bridge.onRunStart() + bridge.onStepStart({ modelId: `gpt-4` }) + bridge.onStepEnd({ finishReason: `stop`, tokenInput: 42 }) + + const value = writes[writes.length - 1]!.value as Record + expect(`context_input_tokens` in value).toBe(false) + expect(`context_window` in value).toBe(false) + }) + it(`onStepEnd omits token columns when a side is undefined`, () => { const writes: Array = [] const bridge = createOutboundBridge([], (e) => { diff --git a/packages/agents-runtime/test/pi-adapter.test.ts b/packages/agents-runtime/test/pi-adapter.test.ts index f8767f86c4..4398df2d2b 100644 --- a/packages/agents-runtime/test/pi-adapter.test.ts +++ b/packages/agents-runtime/test/pi-adapter.test.ts @@ -1088,5 +1088,30 @@ describe(`toAgentHistory`, () => { expect(stepValue?.input_tokens).toBe(100) expect(`output_tokens` in (stepValue ?? {})).toBe(false) }) + + it(`emits the cache-INCLUSIVE prompt size as context_input_tokens`, async () => { + // The context-usage gauge needs every token occupying the window, + // including prompt-cache reads — unlike `input_tokens`, which + // excludes cacheRead for budget accounting. Same split as the test + // above: 50 + 1200 + 100 = 1350, vs the uncached input_tokens of 150. + const events = await runOnce( + makeCompletedMessage({ + input: 50, + cacheRead: 1200, + cacheWrite: 100, + output: 80, + }) + ) + const stepValue = findStepUpdate(events) + expect(stepValue?.context_input_tokens).toBe(1350) + expect(stepValue?.input_tokens).toBe(150) + }) + + it(`emits the model context window on the step`, async () => { + const events = await runOnce(makeCompletedMessage({ input: 100 })) + const stepValue = findStepUpdate(events) + expect(typeof stepValue?.context_window).toBe(`number`) + expect(stepValue?.context_window as number).toBeGreaterThan(0) + }) }) }) diff --git a/packages/agents-runtime/test/timeline-compaction.test.ts b/packages/agents-runtime/test/timeline-compaction.test.ts new file mode 100644 index 0000000000..27f1983b9b --- /dev/null +++ b/packages/agents-runtime/test/timeline-compaction.test.ts @@ -0,0 +1,201 @@ +import { describe, expect, it } from 'vitest' +import { timelineToMessages } from '../src/timeline-context' +import { buildStreamFixture } from './helpers/context-test-helpers' + +function serialize(db: ReturnType): string { + return JSON.stringify(timelineToMessages(db)) +} + +describe(`timelineMessages compaction reconstruction`, () => { + it(`hides items before a compaction checkpoint and keeps the summary + later items`, () => { + const db = buildStreamFixture([ + { kind: `inbox`, at: 1, value: { payload: `FIRST_MESSAGE` } }, + { kind: `inbox`, at: 2, value: { payload: `SECOND_MESSAGE` } }, + { + kind: `context_inserted`, + at: 3, + value: { + id: `compaction`, + name: `compaction_summary`, + attrs: { kind: `compaction`, status: `complete` }, + content: `SUMMARY_OF_EARLIER_WORK`, + }, + }, + { kind: `inbox`, at: 4, value: { payload: `LATEST_MESSAGE` } }, + ]) + + const out = serialize(db) + // Everything before the checkpoint is summarized away… + expect(out).not.toContain(`FIRST_MESSAGE`) + expect(out).not.toContain(`SECOND_MESSAGE`) + // …replaced by the summary, with post-checkpoint messages kept verbatim. + expect(out).toContain(`SUMMARY_OF_EARLIER_WORK`) + expect(out).toContain(`LATEST_MESSAGE`) + }) + + it(`places the summary at the stored watermark, before during-compaction messages`, () => { + // Background compaction snapshotted W=2 (after SECOND), then a prompt+answer + // arrived (at 3,4) while summarizing, and the checkpoint was written LATE + // (at 5) with attrs.watermark=2. The summary must render BEFORE the + // during-compaction messages, not at its physical (late) position. + const db = buildStreamFixture([ + { kind: `inbox`, at: 1, value: { payload: `FIRST_MESSAGE` } }, + { kind: `inbox`, at: 2, value: { payload: `SECOND_MESSAGE` } }, + { kind: `inbox`, at: 3, value: { payload: `DURING_PROMPT` } }, + { kind: `inbox`, at: 4, value: { payload: `DURING_ANSWER` } }, + { + kind: `context_inserted`, + at: 5, + value: { + id: `compaction`, + name: `compaction_summary`, + attrs: { kind: `compaction`, status: `complete`, watermark: 2 }, + content: `SUMMARY_OF_EARLIER_WORK`, + }, + }, + ]) + + const out = serialize(db) + expect(out).not.toContain(`FIRST_MESSAGE`) + expect(out).not.toContain(`SECOND_MESSAGE`) + // Summary first, then the during-compaction prompt + answer verbatim. + const iSummary = out.indexOf(`SUMMARY_OF_EARLIER_WORK`) + const iPrompt = out.indexOf(`DURING_PROMPT`) + const iAnswer = out.indexOf(`DURING_ANSWER`) + expect(iSummary).toBeGreaterThanOrEqual(0) + expect(iSummary).toBeLessThan(iPrompt) + expect(iPrompt).toBeLessThan(iAnswer) + }) + + it(`mid-turn checkpoint keeps messages produced after compaction (no tail loss)`, () => { + // Regression: the mid-turn (sync) checkpoint summarizes the WHOLE context and + // is stamped with watermark = timeline head at compaction (here 3). Messages + // written *after* it — the model's post-compaction output + the next prompt — + // must survive. Previously the checkpoint carried no watermark, so it fell + // back to its own (later) row order and silently dropped recent messages. + const db = buildStreamFixture([ + { kind: `inbox`, at: 1, value: { payload: `EARLY_ONE` } }, + { kind: `inbox`, at: 2, value: { payload: `EARLY_TWO` } }, + { kind: `inbox`, at: 3, value: { payload: `EARLY_THREE` } }, + { + kind: `context_inserted`, + at: 4, + value: { + id: `compaction`, + name: `compaction_summary`, + attrs: { kind: `compaction`, status: `complete`, watermark: 3 }, + content: `MIDTURN_SUMMARY`, + }, + }, + { kind: `inbox`, at: 5, value: { payload: `POST_COMPACTION_ANSWER` } }, + { kind: `inbox`, at: 6, value: { payload: `NEXT_PROMPT` } }, + ]) + + const out = serialize(db) + // Everything <= head is folded into the summary… + expect(out).not.toContain(`EARLY_ONE`) + expect(out).not.toContain(`EARLY_THREE`) + expect(out).toContain(`MIDTURN_SUMMARY`) + // …and everything written after compaction is kept verbatim. + expect(out).toContain(`POST_COMPACTION_ANSWER`) + expect(out).toContain(`NEXT_PROMPT`) + }) + + it(`does NOT hide history for a running (incomplete) checkpoint`, () => { + // Crash-safety: an in-flight/crashed compaction must never drop history. + const db = buildStreamFixture([ + { kind: `inbox`, at: 1, value: { payload: `FIRST_MESSAGE` } }, + { + kind: `context_inserted`, + at: 2, + value: { + id: `compaction`, + name: `compaction_summary`, + attrs: { kind: `compaction`, status: `running` }, + content: ``, + }, + }, + { kind: `inbox`, at: 3, value: { payload: `SECOND_MESSAGE` } }, + ]) + + const out = serialize(db) + expect(out).toContain(`FIRST_MESSAGE`) + expect(out).toContain(`SECOND_MESSAGE`) + // The running checkpoint is a UI-only marker — not rendered to the model. + expect(out).not.toContain(`compaction_summary`) + }) + + it(`keeps applying a completed background checkpoint after the NEXT background starts`, () => { + // Regression: every background generation has a watermark-unique id, so a + // fresh `running` checkpoint (next generation) must NOT supersede the prior + // `complete` one. With the old shared id, the running row erased the + // complete watermark and reconstruction stopped compacting entirely. + const db = buildStreamFixture([ + { kind: `inbox`, at: 1, value: { payload: `FIRST_MESSAGE` } }, + { kind: `inbox`, at: 2, value: { payload: `SECOND_MESSAGE` } }, + { + kind: `context_inserted`, + at: 3, + value: { + id: `compaction-bg-2`, + name: `compaction_summary`, + attrs: { kind: `compaction`, status: `complete`, watermark: 2 }, + content: `SUMMARY_OF_EARLIER_WORK`, + }, + }, + { kind: `inbox`, at: 4, value: { payload: `LATEST_MESSAGE` } }, + // The next background generation kicks off — different id, so it must not + // clobber the complete above. + { + kind: `context_inserted`, + at: 5, + value: { + id: `compaction-bg-4`, + name: `compaction_summary`, + attrs: { kind: `compaction`, status: `running`, watermark: 4 }, + content: ``, + }, + }, + ]) + + const out = serialize(db) + // The completed checkpoint still compacts away the early messages… + expect(out).not.toContain(`FIRST_MESSAGE`) + expect(out).not.toContain(`SECOND_MESSAGE`) + expect(out).toContain(`SUMMARY_OF_EARLIER_WORK`) + expect(out).toContain(`LATEST_MESSAGE`) + }) + + it(`is a no-op when there is no compaction checkpoint`, () => { + const db = buildStreamFixture([ + { kind: `inbox`, at: 1, value: { payload: `FIRST_MESSAGE` } }, + { kind: `inbox`, at: 2, value: { payload: `SECOND_MESSAGE` } }, + ]) + + const out = serialize(db) + expect(out).toContain(`FIRST_MESSAGE`) + expect(out).toContain(`SECOND_MESSAGE`) + }) + + it(`ignores a non-compaction context entry (no watermark)`, () => { + const db = buildStreamFixture([ + { kind: `inbox`, at: 1, value: { payload: `FIRST_MESSAGE` } }, + { + kind: `context_inserted`, + at: 2, + value: { + id: `note`, + name: `note`, + attrs: {}, + content: `JUST_A_NOTE`, + }, + }, + { kind: `inbox`, at: 3, value: { payload: `SECOND_MESSAGE` } }, + ]) + + const out = serialize(db) + // A plain context entry must NOT act as a compaction watermark. + expect(out).toContain(`FIRST_MESSAGE`) + expect(out).toContain(`SECOND_MESSAGE`) + }) +}) diff --git a/packages/agents-runtime/test/token-accountant.test.ts b/packages/agents-runtime/test/token-accountant.test.ts new file mode 100644 index 0000000000..2ddfa97ca7 --- /dev/null +++ b/packages/agents-runtime/test/token-accountant.test.ts @@ -0,0 +1,220 @@ +import { describe, expect, it } from 'vitest' +import { + CONTEXT_USAGE_BACKGROUND_START, + CONTEXT_USAGE_HARD_CEILING, + computeContextUsage, + contextUsageLevel, + formatContextUsagePercent, + formatContextBudgetNotice, + selectLatestContextUsage, + shouldSurfaceContextBudget, + withContextBudgetNotice, + truncateOversizedToolResults, +} from '../src/token-accountant' +import type { LLMMessage } from '../src/types' + +describe(`computeContextUsage`, () => { + it(`sums input + output against the window`, () => { + const usage = computeContextUsage({ + contextInputTokens: 40_000, + outputTokens: 10_000, + contextWindow: 100_000, + }) + expect(usage).toEqual({ + usedTokens: 50_000, + contextWindow: 100_000, + ratio: 0.5, + }) + }) + + it(`treats output as optional`, () => { + const usage = computeContextUsage({ + contextInputTokens: 25_000, + contextWindow: 100_000, + }) + expect(usage?.usedTokens).toBe(25_000) + expect(usage?.ratio).toBe(0.25) + }) + + it(`clamps the ratio to 1 when over the window`, () => { + const usage = computeContextUsage({ + contextInputTokens: 120_000, + outputTokens: 20_000, + contextWindow: 100_000, + }) + expect(usage?.ratio).toBe(1) + // usedTokens is the true (un-clamped) count for the tooltip. + expect(usage?.usedTokens).toBe(140_000) + }) + + it(`returns null for an unknown or non-positive window`, () => { + expect( + computeContextUsage({ contextInputTokens: 10, contextWindow: 0 }) + ).toBeNull() + expect( + computeContextUsage({ + contextInputTokens: 10, + contextWindow: Number.NaN, + }) + ).toBeNull() + }) +}) + +describe(`contextUsageLevel`, () => { + it(`is normal below the background-compaction threshold`, () => { + expect(contextUsageLevel(0)).toBe(`normal`) + expect(contextUsageLevel(CONTEXT_USAGE_BACKGROUND_START - 0.01)).toBe( + `normal` + ) + }) + + it(`is warning between background start and the hard ceiling`, () => { + expect(contextUsageLevel(CONTEXT_USAGE_BACKGROUND_START)).toBe(`warning`) + expect(contextUsageLevel(CONTEXT_USAGE_HARD_CEILING - 0.01)).toBe(`warning`) + }) + + it(`is critical at or above the hard ceiling`, () => { + expect(contextUsageLevel(CONTEXT_USAGE_HARD_CEILING)).toBe(`critical`) + expect(contextUsageLevel(1)).toBe(`critical`) + }) +}) + +describe(`formatContextUsagePercent`, () => { + it(`renders a whole-percent label`, () => { + expect(formatContextUsagePercent(0)).toBe(`0%`) + expect(formatContextUsagePercent(0.426)).toBe(`43%`) + expect(formatContextUsagePercent(1)).toBe(`100%`) + }) +}) + +describe(`selectLatestContextUsage`, () => { + it(`picks the step with the highest _seq that reported usage`, () => { + const usage = selectLatestContextUsage([ + { _seq: 1, context_input_tokens: 10_000, context_window: 100_000 }, + { + _seq: 3, + context_input_tokens: 60_000, + output_tokens: 5_000, + context_window: 100_000, + }, + { _seq: 2, context_input_tokens: 30_000, context_window: 100_000 }, + ]) + expect(usage?.usedTokens).toBe(65_000) + expect(usage?.ratio).toBe(0.65) + }) + + it(`ignores steps that have not reported context usage`, () => { + const usage = selectLatestContextUsage([ + { _seq: 5, output_tokens: 9 }, // started, no usage yet — must be skipped + { _seq: 2, context_input_tokens: 40_000, context_window: 100_000 }, + ]) + expect(usage?.usedTokens).toBe(40_000) + }) + + it(`returns null when no step has reported usage`, () => { + expect(selectLatestContextUsage([])).toBeNull() + expect(selectLatestContextUsage([{ _seq: 1, output_tokens: 5 }])).toBeNull() + }) +}) + +describe(`shouldSurfaceContextBudget`, () => { + it(`is false below 25% and true at/above it`, () => { + expect(shouldSurfaceContextBudget(0.1)).toBe(false) + expect(shouldSurfaceContextBudget(0.2499)).toBe(false) + expect(shouldSurfaceContextBudget(0.25)).toBe(true) + expect(shouldSurfaceContextBudget(0.9)).toBe(true) + }) +}) + +describe(`formatContextBudgetNotice`, () => { + it(`states remaining tokens and percent left`, () => { + const usage = computeContextUsage({ + contextInputTokens: 75_000, + contextWindow: 100_000, + })! + // 100k window, 75k used → 25k (25%) remaining. + expect(formatContextBudgetNotice(usage)).toBe( + `You have about 25k tokens (25%) of the context window remaining.` + ) + }) + + it(`never reports negative remaining when over the window`, () => { + const usage = computeContextUsage({ + contextInputTokens: 130_000, + contextWindow: 100_000, + })! + expect(formatContextBudgetNotice(usage)).toBe( + `You have about 0 tokens (0%) of the context window remaining.` + ) + }) +}) + +describe(`withContextBudgetNotice`, () => { + const user = (text: string): LLMMessage => ({ role: `user`, content: text }) + const lowUsage = computeContextUsage({ + contextInputTokens: 10_000, + contextWindow: 100_000, + }) + const highUsage = computeContextUsage({ + contextInputTokens: 80_000, + contextWindow: 100_000, + }) + + it(`leaves messages unchanged when usage is unknown`, () => { + const messages = [user(`hello`)] + expect(withContextBudgetNotice(messages, null)).toEqual(messages) + }) + + it(`leaves messages unchanged below the first threshold`, () => { + const messages = [user(`hello`)] + expect(withContextBudgetNotice(messages, lowUsage)).toEqual(messages) + }) + + it(`injects the notice just before the final message`, () => { + const messages = [user(`first`), user(`latest`)] + const result = withContextBudgetNotice(messages, highUsage) + expect(result).toHaveLength(3) + expect(result[0]).toEqual(user(`first`)) + expect(result[1]?.role).toBe(`user`) + expect(String(result[1]?.content)).toContain(``) + // The final message (the live turn) stays last so runInput detection + // that reads `.at(-1)` is unaffected. + expect(result[2]).toEqual(user(`latest`)) + }) + + it(`emits just the notice when there are no messages yet`, () => { + const result = withContextBudgetNotice([], highUsage) + expect(result).toHaveLength(1) + expect(String(result[0]?.content)).toContain(`token_budget`) + }) +}) + +describe(`truncateOversizedToolResults`, () => { + const toolResult = (content: string): LLMMessage => ({ + role: `tool_result`, + toolCallId: `call-1`, + isError: false, + content, + }) + + it(`replaces a tool result that exceeds the cap with a placeholder`, () => { + // approxTokens ≈ chars/4, so 400 chars ≈ 100 tokens > cap of 10. + const big = `x`.repeat(400) + const [out] = truncateOversizedToolResults([toolResult(big)], 10) + expect(String(out?.content)).toContain(`Output truncated`) + // pairing metadata is preserved so the tool_call/result stays valid + expect(out).toMatchObject({ role: `tool_result`, toolCallId: `call-1` }) + }) + + it(`leaves a tool result within the cap untouched`, () => { + const small = toolResult(`small output`) + const [out] = truncateOversizedToolResults([small], 10) + expect(out).toBe(small) + }) + + it(`never touches non-tool-result messages`, () => { + const user: LLMMessage = { role: `user`, content: `x`.repeat(400) } + const [out] = truncateOversizedToolResults([user], 10) + expect(out).toBe(user) + }) +}) diff --git a/packages/agents-server-ui/src/components/CompactionIndicator.module.css b/packages/agents-server-ui/src/components/CompactionIndicator.module.css new file mode 100644 index 0000000000..e610a7be94 --- /dev/null +++ b/packages/agents-server-ui/src/components/CompactionIndicator.module.css @@ -0,0 +1,44 @@ +.indicator { + display: inline-flex; + align-items: center; + gap: 0.4rem; + padding: 0 0.25rem; + color: var(--warning-text, #b26a00); + user-select: none; +} + +/* Background compaction is non-blocking — render it muted/subtle so it doesn't + read as "you're blocked" like the sync (amber) indicator. */ +.background { + color: var(--text-muted, rgba(0, 0, 0, 0.55)); + opacity: 0.8; +} + +.label { + font-size: 0.72rem; + line-height: 1; + color: inherit; +} + +.spinner { + width: 10px; + height: 10px; + flex-shrink: 0; + border-radius: 50%; + border: 2px solid currentColor; + border-top-color: transparent; + opacity: 0.8; + animation: compaction-spin 0.7s linear infinite; +} + +@keyframes compaction-spin { + to { + transform: rotate(360deg); + } +} + +@media (prefers-reduced-motion: reduce) { + .spinner { + animation: none; + } +} diff --git a/packages/agents-server-ui/src/components/CompactionIndicator.tsx b/packages/agents-server-ui/src/components/CompactionIndicator.tsx new file mode 100644 index 0000000000..b134ffb26e --- /dev/null +++ b/packages/agents-server-ui/src/components/CompactionIndicator.tsx @@ -0,0 +1,85 @@ +import { useEffect, useMemo, useState } from 'react' +import { useLiveQuery } from '@tanstack/react-db' +import type { EntityStreamDBWithActions } from '@electric-ax/agents-runtime/client' +import { + STALE_RUNNING_MS, + isRunningCheckpointOrphaned, +} from '../lib/compactionIndicator' +import styles from './CompactionIndicator.module.css' + +/** + * Spinner shown while a context compaction is in flight. The runtime writes a + * `running` compaction checkpoint before summarizing and supersedes it with + * `complete`/`failed` when done; we show the spinner while the latest is + * `running`. Orphaned (crashed) running rows are cleared after a timeout — see + * `lib/compactionIndicator`. + */ + +interface CheckpointRow { + _seq?: number + timestamp?: string + attrs?: { kind?: string; status?: string; background?: boolean } +} + +interface CompactionIndicatorProps { + db: EntityStreamDBWithActions | null +} + +export function CompactionIndicator({ + db, +}: CompactionIndicatorProps): React.ReactElement | null { + const { data: rows = [] } = useLiveQuery( + (q) => + db && db.collections.contextInserted + ? q.from({ entry: db.collections.contextInserted as any }) + : undefined, + [db] + ) + + const latest = useMemo(() => { + // The newest compaction checkpoint wins (later writes supersede earlier ones + // for the same id). Only a `running` one drives the spinner. + let latest: CheckpointRow | null = null + for (const row of rows as Array) { + if (row.attrs?.kind !== `compaction`) continue + if (!latest || (row._seq ?? 0) > (latest._seq ?? 0)) latest = row + } + return latest?.attrs?.status === `running` ? latest : null + }, [rows]) + + const runningSince = latest?.timestamp + ? Date.parse(latest.timestamp) + : Number.NaN + + // Re-render once the running checkpoint crosses the staleness deadline, so an + // orphaned spinner clears itself even if no further events arrive. + const [, bump] = useState(0) + useEffect(() => { + if (!latest || !Number.isFinite(runningSince)) return + const remaining = STALE_RUNNING_MS - (Date.now() - runningSince) + if (remaining <= 0) return + const id = setTimeout(() => bump((n) => n + 1), remaining) + return () => clearTimeout(id) + }, [latest, runningSince]) + + if (!latest) return null + if (isRunningCheckpointOrphaned(latest.timestamp, Date.now())) return null + + // Background compaction is non-blocking, so it's shown subtly and distinctly + // from the blocking (sync, mid-turn) "Compacting context…". + const background = Boolean(latest.attrs?.background) + return ( + + + ) +} diff --git a/packages/agents-server-ui/src/components/ContextUsageIndicator.module.css b/packages/agents-server-ui/src/components/ContextUsageIndicator.module.css new file mode 100644 index 0000000000..ec502c41d2 --- /dev/null +++ b/packages/agents-server-ui/src/components/ContextUsageIndicator.module.css @@ -0,0 +1,39 @@ +.indicator { + display: inline-flex; + align-items: center; + gap: 0.3rem; + padding: 0 0.25rem; + font-variant-numeric: tabular-nums; + color: var(--text-muted, rgba(0, 0, 0, 0.55)); + user-select: none; + cursor: default; +} + +.percent { + font-size: 0.72rem; + line-height: 1; + color: inherit; +} + +.dot { + width: 6px; + height: 6px; + border-radius: 50%; + background: currentColor; + opacity: 0.6; + flex-shrink: 0; +} + +/* Levels mirror the compaction thresholds: warning once background + compaction would start (85%), critical at the hard ceiling (90%). */ +.normal { + color: var(--text-muted, rgba(0, 0, 0, 0.55)); +} + +.warning { + color: var(--warning-text, #b26a00); +} + +.critical { + color: var(--danger-text, #a02020); +} diff --git a/packages/agents-server-ui/src/components/ContextUsageIndicator.tsx b/packages/agents-server-ui/src/components/ContextUsageIndicator.tsx new file mode 100644 index 0000000000..4ebc6e2e01 --- /dev/null +++ b/packages/agents-server-ui/src/components/ContextUsageIndicator.tsx @@ -0,0 +1,91 @@ +import { useMemo } from 'react' +import { useLiveQuery } from '@tanstack/react-db' +import { + computeContextUsage, + contextUsageLevel, + formatContextUsagePercent, + formatTokenCount, +} from '@electric-ax/agents-runtime/client' +import type { EntityStreamDBWithActions } from '@electric-ax/agents-runtime/client' +import { Tooltip } from '../ui/Tooltip' +import styles from './ContextUsageIndicator.module.css' + +/** + * Context-window gauge for the composer footer ("X% used"). Reads the same + * numbers the runtime persists for compaction (the latest step's cache-inclusive + * prompt size and the model's window) through `computeContextUsage`. + */ + +interface StepRow { + _seq?: number + context_input_tokens?: number + context_window?: number + output_tokens?: number + model_id?: string +} + +interface ContextUsageIndicatorProps { + db: EntityStreamDBWithActions | null +} + +export function ContextUsageIndicator({ + db, +}: ContextUsageIndicatorProps): React.ReactElement | null { + const { data: steps = [] } = useLiveQuery( + (q) => + db && db.collections.steps + ? q.from({ step: db.collections.steps as any }) + : undefined, + [db] + ) + + const usage = useMemo(() => { + // The most-recently-started step that reported context usage holds the + // freshest, fullest prompt size (the last step of the latest run carries + // the whole conversation). `_seq` is the collection's monotonic insertion + // order, so the max among completed steps is the latest. + let latest: StepRow | null = null + for (const row of steps as Array) { + if ( + typeof row.context_window !== `number` || + row.context_window <= 0 || + typeof row.context_input_tokens !== `number` + ) { + continue + } + if (!latest || (row._seq ?? 0) > (latest._seq ?? 0)) { + latest = row + } + } + if (!latest) return null + const computed = computeContextUsage({ + contextInputTokens: latest.context_input_tokens as number, + outputTokens: latest.output_tokens, + contextWindow: latest.context_window as number, + }) + return computed ? { ...computed, modelId: latest.model_id } : null + }, [steps]) + + if (!usage) return null + + const level = contextUsageLevel(usage.ratio) + const percent = formatContextUsagePercent(usage.ratio) + const tokensLabel = `${formatTokenCount(usage.usedTokens)} / ${formatTokenCount( + usage.contextWindow + )} tokens` + const tooltip = usage.modelId + ? `${tokensLabel} · ${usage.modelId}` + : tokensLabel + + return ( + + + + + ) +} diff --git a/packages/agents-server-ui/src/components/EntityTimeline.tsx b/packages/agents-server-ui/src/components/EntityTimeline.tsx index feee790a8c..9a52a8c94f 100644 --- a/packages/agents-server-ui/src/components/EntityTimeline.tsx +++ b/packages/agents-server-ui/src/components/EntityTimeline.tsx @@ -21,6 +21,7 @@ import { Database, ExternalLink, FileJson, + FoldVertical, GitBranch, Radio, Reply, @@ -243,7 +244,7 @@ function estimateRowHeight( const lines = Math.max(1, Math.ceil(row.comment.body.length / charsPerLine)) return Math.max(58, 42 + lines * lineHeight) + timelineRowGap(row, nextRow) } - if (row.wake || row.signal || row.manifest) { + if (row.wake || row.signal || row.manifest || row.compaction) { return 76 + timelineRowGap(row, nextRow) } return 120 + timelineRowGap(row, nextRow) @@ -261,7 +262,9 @@ function timelineRowGap( nextRow?: RenderTimelineRow ): number { if (shouldCollapseCommentMeta(row, nextRow)) return 6 - return row.manifest || row.wake || row.signal ? MANIFEST_ROW_GAP : ROW_GAP + return row.manifest || row.wake || row.signal || row.compaction + ? MANIFEST_ROW_GAP + : ROW_GAP } function isPlainCommentRow(row: RenderTimelineRow | undefined): boolean { @@ -309,11 +312,13 @@ function timelineRowSearchText( if (row.signal) return signalSearchText(row.signal) if (row.error) return `${row.error.error_code} ${row.error.message}` if (row.manifest) return manifestSearchText(row.manifest) + if (row.compaction) return `context compacted ${row.compaction.content}` return runSearchTextByKey.get(row.$key) ?? runSearchTextFromSnapshot(row.run) } function timelineRowLabel(row: RenderTimelineRow): string { if (row.comment) return `Comment` + if (row.compaction) return `Context compacted` if (row.inbox?.from_agent) return `Agent message` if (row.inbox) return `User message` if (row.wake) return `Wake` @@ -676,6 +681,28 @@ function ErrorTimelineRow({ ) } +function CompactionTimelineRow({ + compaction, +}: { + compaction: NonNullable +}): React.ReactElement { + const summary = compaction.content.trim() + return ( +
+ + {summary.length > 0 ? ( +
{summary}
+ ) : undefined} +
+
+ ) +} + function signalSearchText( signal: NonNullable ): string { @@ -1313,6 +1340,10 @@ const TimelineRow = memo(function TimelineRow({ return } + if (row.compaction) { + return + } + if (row.manifest) { return ( - ) : null + <> + {imageAttachmentsEnabled && !isCommentMode ? ( + + ) : null} + + + } send={ diff --git a/packages/agents-server-ui/src/hooks/useEntityTimeline.ts b/packages/agents-server-ui/src/hooks/useEntityTimeline.ts index 8834485356..c3dbc044e6 100644 --- a/packages/agents-server-ui/src/hooks/useEntityTimeline.ts +++ b/packages/agents-server-ui/src/hooks/useEntityTimeline.ts @@ -9,6 +9,10 @@ import { import { coalesce, eq } from '@durable-streams/state/db' import { connectEntityStream } from '../lib/entity-connection' import { createCommentsTimelineSource } from '../lib/comments' +import { + createCompactionTimelineSource, + isCompletedCompactionRow, +} from '../lib/compaction' import type { TimelineRow } from '../lib/comments' import type { EntityStreamDBWithActions, @@ -122,9 +126,12 @@ export function useEntityTimeline( (q) => { if (!db) return undefined return createEntityTimelineQuery(db, { - ...(includeComments && { - customSources: { comment: createCommentsTimelineSource(db) }, - }), + customSources: { + compaction: createCompactionTimelineSource(db), + ...(includeComments && { + comment: createCommentsTimelineSource(db), + }), + }, })(q) }, [db, includeComments] @@ -155,7 +162,16 @@ export function useEntityTimeline( : undefined, [db] ) - const typedTimelineRows = timelineRows as Array + // Only a *completed* compaction checkpoint renders as a timeline marker; + // `running`/`failed` checkpoints are surfaced by the live composer indicator + // instead, not as message-history entries. + const typedTimelineRows = useMemo( + () => + (timelineRows as Array).filter( + (row) => !row.compaction || isCompletedCompactionRow(row) + ), + [timelineRows] + ) const pendingInbox = useMemo( () => diff --git a/packages/agents-server-ui/src/lib/comments.ts b/packages/agents-server-ui/src/lib/comments.ts index 056addc82f..8279261f56 100644 --- a/packages/agents-server-ui/src/lib/comments.ts +++ b/packages/agents-server-ui/src/lib/comments.ts @@ -14,6 +14,7 @@ import type { EntityTimelineCustomSource, EntityTimelineQueryRow, } from '@electric-ax/agents-runtime/client' +import type { CompactionTimelineRow } from './compaction' /** * Comments are a UI-level concern: the runtime timeline query knows nothing @@ -33,6 +34,7 @@ export type EntityTimelineCommentRow = { export type CommentTimelineRow = { $key: string comment: EntityTimelineCommentRow + compaction?: undefined inbox?: undefined run?: undefined wake?: undefined @@ -41,10 +43,14 @@ export type CommentTimelineRow = { manifest?: undefined } -/** Timeline row as consumed by UI views: runtime rows plus merged comment rows. */ +/** + * Timeline row as consumed by UI views: runtime rows plus merged custom-source + * rows (comments, compaction markers). + */ export type TimelineRow = - | (EntityTimelineQueryRow & { comment?: undefined }) + | (EntityTimelineQueryRow & { comment?: undefined; compaction?: undefined }) | CommentTimelineRow + | CompactionTimelineRow /** * Timeline source for the `comments` collection, passed to the runtime's diff --git a/packages/agents-server-ui/src/lib/compaction.ts b/packages/agents-server-ui/src/lib/compaction.ts new file mode 100644 index 0000000000..1d0db61bde --- /dev/null +++ b/packages/agents-server-ui/src/lib/compaction.ts @@ -0,0 +1,62 @@ +import { coalesce, eq } from '@durable-streams/state/db' +import { TIMELINE_ORDER_FALLBACK } from '@electric-ax/agents-runtime/client' +import type { + EntityStreamDBWithActions, + EntityTimelineCustomSource, +} from '@electric-ax/agents-runtime/client' + +/** + * Compaction checkpoints are persisted as `context_inserted` rows tagged + * `name: "compaction_summary"`. The runtime timeline query doesn't surface + * them; `useEntityTimeline` merges them in via this custom source so a + * "Context compacted" marker can render in the message history at the point + * compaction happened. Must match the runtime's COMPACTION_CHECKPOINT_NAME. + */ +const COMPACTION_CHECKPOINT_NAME = `compaction_summary` + +export type EntityTimelineCompactionRow = { + key: string + order: string + attrs?: { kind?: string; status?: string } + content: string + timestamp: string +} + +export type CompactionTimelineRow = { + $key: string + compaction: EntityTimelineCompactionRow + comment?: undefined + inbox?: undefined + run?: undefined + wake?: undefined + signal?: undefined + error?: undefined + manifest?: undefined +} + +export function createCompactionTimelineSource( + db: EntityStreamDBWithActions +): EntityTimelineCustomSource { + const contextInserted = (db.collections as Record) + .contextInserted + return (q) => + q + .from({ compaction: contextInserted }) + .where(({ compaction }: any) => + eq(compaction.name, COMPACTION_CHECKPOINT_NAME) + ) + .select(({ compaction }: any) => ({ + order: coalesce(compaction._timeline_order, TIMELINE_ORDER_FALLBACK), + key: compaction.key, + attrs: compaction.attrs, + content: coalesce(compaction.content, ``), + timestamp: coalesce(compaction.timestamp, ``), + })) +} + +/** A completed compaction is the persistent marker; running/failed are not. */ +export function isCompletedCompactionRow(row: { + compaction?: EntityTimelineCompactionRow +}): boolean { + return row.compaction?.attrs?.status === `complete` +} diff --git a/packages/agents-server-ui/src/lib/compactionIndicator.test.ts b/packages/agents-server-ui/src/lib/compactionIndicator.test.ts new file mode 100644 index 0000000000..9b1a7f4361 --- /dev/null +++ b/packages/agents-server-ui/src/lib/compactionIndicator.test.ts @@ -0,0 +1,38 @@ +import { describe, expect, it } from 'vitest' +import { + STALE_RUNNING_MS, + isRunningCheckpointOrphaned, +} from './compactionIndicator' + +describe(`isRunningCheckpointOrphaned`, () => { + const now = 1_000_000_000_000 + const iso = (ms: number) => new Date(ms).toISOString() + + it(`is NOT orphaned for a fresh running checkpoint`, () => { + expect(isRunningCheckpointOrphaned(iso(now), now)).toBe(false) + expect(isRunningCheckpointOrphaned(iso(now - 30_000), now)).toBe(false) + }) + + it(`is NOT orphaned just under the staleness deadline`, () => { + expect( + isRunningCheckpointOrphaned(iso(now - (STALE_RUNNING_MS - 1)), now) + ).toBe(false) + }) + + it(`IS orphaned at/after the staleness deadline (crashed mid-summarize)`, () => { + expect(isRunningCheckpointOrphaned(iso(now - STALE_RUNNING_MS), now)).toBe( + true + ) + expect( + isRunningCheckpointOrphaned(iso(now - STALE_RUNNING_MS * 10), now) + ).toBe(true) + }) + + it(`treats a missing or unparseable timestamp as NOT orphaned`, () => { + // We can't prove staleness, so we keep showing the spinner rather than hide + // a possibly-live compaction. (insertContext always stamps a timestamp.) + expect(isRunningCheckpointOrphaned(undefined, now)).toBe(false) + expect(isRunningCheckpointOrphaned(``, now)).toBe(false) + expect(isRunningCheckpointOrphaned(`not-a-date`, now)).toBe(false) + }) +}) diff --git a/packages/agents-server-ui/src/lib/compactionIndicator.ts b/packages/agents-server-ui/src/lib/compactionIndicator.ts new file mode 100644 index 0000000000..9c78ee3c45 --- /dev/null +++ b/packages/agents-server-ui/src/lib/compactionIndicator.ts @@ -0,0 +1,21 @@ +// Staleness logic for the "Compacting…" indicator, split out so it's unit +// testable without a render harness. + +/** A `running` checkpoint older than this is orphaned (its process crashed + * before a terminal row); comfortably above the summarize timeout. */ +export const STALE_RUNNING_MS = 150_000 + +/** + * Whether a `running` checkpoint with `timestamp` (ISO) is orphaned at `now` + * (ms). A missing/unparseable timestamp counts as live — we can't prove + * staleness, and hiding an in-flight compaction is worse than over-showing one. + */ +export function isRunningCheckpointOrphaned( + timestamp: string | undefined, + now: number +): boolean { + if (!timestamp) return false + const since = Date.parse(timestamp) + if (!Number.isFinite(since)) return false + return now - since >= STALE_RUNNING_MS +}