diff --git a/src/services/AgentSummary/__tests__/agentSummary.test.ts b/src/services/AgentSummary/__tests__/agentSummary.test.ts index 368671f03..421219a94 100644 --- a/src/services/AgentSummary/__tests__/agentSummary.test.ts +++ b/src/services/AgentSummary/__tests__/agentSummary.test.ts @@ -5,7 +5,10 @@ import type { CacheSafeParams, ForkedAgentResult, } from '../../../utils/forkedAgent.js' -import { startAgentSummarization } from '../agentSummary.js' +import { + type AgentSummaryDependencies, + startAgentSummarization, +} from '../agentSummary.js' const transcriptMessages = [ { type: 'user', message: { content: 'start' }, uuid: 'u1' }, @@ -27,17 +30,16 @@ describe('startAgentSummarization', () => { let forkCalls: ForkCall[] let updateCalls: Array<{ taskId: string; summary: string }> let transcriptMessagesForTest: Message[] + let debugLogs: string[] + let loggedErrors: Error[] + let clearedHandles: unknown[] + let scheduledCount: number + let lastTimerHandle: unknown - beforeEach(() => { - forkCalls = [] - updateCalls = [] - scheduled = undefined - handle = undefined - transcriptMessagesForTest = transcriptMessages - }) - - test('summarizes bounded transcript once and skips unchanged fingerprints', async () => { - handle = startAgentSummarization( + function startTestSummarization( + dependencies: AgentSummaryDependencies = {}, + ): { stop: () => void } { + return startAgentSummarization( 'task-1', asAgentId('a0000000000000000'), { @@ -48,14 +50,22 @@ describe('startAgentSummarization', () => { } as unknown as CacheSafeParams, () => undefined, { - clearTimeout: () => undefined, + clearTimeout: ((timeoutId: unknown) => { + clearedHandles.push(timeoutId) + }) as typeof clearTimeout, getAgentTranscript: async () => ({ messages: transcriptMessagesForTest, contentReplacements: [], }), isPoorModeActive: () => false, - logError: () => undefined, - logForDebugging: () => undefined, + logError: error => { + loggedErrors.push( + error instanceof Error ? error : new Error(String(error)), + ) + }, + logForDebugging: message => { + debugLogs.push(message) + }, runForkedAgent: async (args: ForkCall) => { forkCalls.push(args) return { @@ -73,14 +83,34 @@ describe('startAgentSummarization', () => { if (typeof callback !== 'function') { throw new Error('Expected timer callback') } + scheduledCount += 1 scheduled = callback as () => void | Promise - return 1 as unknown as ReturnType + lastTimerHandle = { id: scheduledCount } + return lastTimerHandle as ReturnType }) as unknown as typeof setTimeout, updateAgentSummary: (taskId: string, summary: string) => { updateCalls.push({ taskId, summary }) }, + ...dependencies, }, ) + } + + beforeEach(() => { + forkCalls = [] + updateCalls = [] + scheduled = undefined + handle = undefined + transcriptMessagesForTest = transcriptMessages + debugLogs = [] + loggedErrors = [] + clearedHandles = [] + scheduledCount = 0 + lastTimerHandle = undefined + }) + + test('summarizes bounded transcript once and skips unchanged fingerprints', async () => { + handle = startTestSummarization() expect(typeof scheduled).toBe('function') await scheduled!() @@ -104,49 +134,95 @@ describe('startAgentSummarization', () => { expect(forkCalls).toHaveLength(1) expect(updateCalls).toHaveLength(1) + expect(loggedErrors).toEqual([]) }) - test('skips summarization when bounded context is too small', async () => { - transcriptMessagesForTest = transcriptMessages.slice(0, 2) - - handle = startAgentSummarization( - 'task-1', - asAgentId('a0000000000000000'), + test('skips summarization when filtering leaves too little bounded context', async () => { + transcriptMessagesForTest = [ + { type: 'user', message: { content: 'start' }, uuid: 'u1' }, { - forkContextMessages: transcriptMessages, - model: 'claude-test', - } as unknown as CacheSafeParams, - () => undefined, - { - clearTimeout: () => undefined, - getAgentTranscript: async () => ({ - messages: transcriptMessagesForTest, - contentReplacements: [], - }), - isPoorModeActive: () => false, - logError: () => undefined, - logForDebugging: () => undefined, - runForkedAgent: async (args: ForkCall) => { - forkCalls.push(args) - return { messages: [] } as unknown as ForkedAgentResult - }, - setTimeout: ((callback: TimerHandler) => { - if (typeof callback !== 'function') { - throw new Error('Expected timer callback') - } - scheduled = callback as () => void | Promise - return 1 as unknown as ReturnType - }) as unknown as typeof setTimeout, - updateAgentSummary: (taskId: string, summary: string) => { - updateCalls.push({ taskId, summary }) + type: 'assistant', + uuid: 'a1', + message: { + content: [{ type: 'tool_use', id: 'missing', name: 'Read' }], }, }, + { type: 'user', message: { content: 'continue' }, uuid: 'u2' }, + ] as unknown as Message[] + + handle = startTestSummarization() + + expect(typeof scheduled).toBe('function') + await scheduled!() + + expect(forkCalls).toEqual([]) + expect(updateCalls).toEqual([]) + expect(debugLogs).toContain( + '[AgentSummary] Skipping summary for task-1: no bounded context available', ) + }) + + test('skips summarization before building context when transcript is too short', async () => { + transcriptMessagesForTest = transcriptMessages.slice(0, 2) + handle = startTestSummarization() expect(typeof scheduled).toBe('function') await scheduled!() expect(forkCalls).toEqual([]) expect(updateCalls).toEqual([]) + expect(debugLogs).toContain( + '[AgentSummary] Skipping summary for task-1: not enough messages (2)', + ) + }) + + test('skips and reschedules while poor mode is active', async () => { + handle = startTestSummarization({ + isPoorModeActive: () => true, + }) + + expect(typeof scheduled).toBe('function') + const initialScheduledCount = scheduledCount + const initialTimerHandle = lastTimerHandle + await scheduled!() + + expect(forkCalls).toEqual([]) + expect(updateCalls).toEqual([]) + expect(debugLogs).toContain( + '[AgentSummary] Skipping summary — poor mode active', + ) + expect(scheduledCount).toBe(initialScheduledCount + 1) + expect(lastTimerHandle).not.toBe(initialTimerHandle) + }) + + test('logs summary errors and schedules the next timer', async () => { + const error = new Error('fork failed') + handle = startTestSummarization({ + runForkedAgent: async () => { + throw error + }, + }) + + expect(typeof scheduled).toBe('function') + const initialScheduledCount = scheduledCount + const initialTimerHandle = lastTimerHandle + await scheduled!() + + expect(loggedErrors).toEqual([error]) + expect(updateCalls).toEqual([]) + expect(scheduledCount).toBe(initialScheduledCount + 1) + expect(lastTimerHandle).not.toBe(initialTimerHandle) + }) + + test('stop clears the pending summary timer', () => { + handle = startTestSummarization() + const pendingHandle = lastTimerHandle + + handle.stop() + + expect(debugLogs).toContain( + '[AgentSummary] Stopping summarization for task-1', + ) + expect(clearedHandles).toEqual([pendingHandle]) }) }) diff --git a/src/services/AgentSummary/__tests__/summaryContext.test.ts b/src/services/AgentSummary/__tests__/summaryContext.test.ts index fe0eb3057..5aafcf3c1 100644 --- a/src/services/AgentSummary/__tests__/summaryContext.test.ts +++ b/src/services/AgentSummary/__tests__/summaryContext.test.ts @@ -141,6 +141,13 @@ describe('getSummaryContextFingerprint', () => { expect(estimateMessageChars(message)).toBeGreaterThan(0) }) + test('treats unsupported top-level primitives as zero-size estimates', () => { + expect( + estimateMessageChars((() => undefined) as unknown as Message), + ).toBe(0) + expect(estimateMessageChars(1n as unknown as Message)).toBe(0) + }) + test('returns null for an empty transcript', () => { expect(getSummaryContextFingerprint([])).toBeNull() }) diff --git a/src/utils/__tests__/teammateMailbox.test.ts b/src/utils/__tests__/teammateMailbox.test.ts index f6279dab7..d3dc36e54 100644 --- a/src/utils/__tests__/teammateMailbox.test.ts +++ b/src/utils/__tests__/teammateMailbox.test.ts @@ -1,9 +1,10 @@ import { afterEach, beforeEach, describe, expect, test } from 'bun:test' -import { mkdir, readFile, rm, writeFile } from 'node:fs/promises' +import { mkdir, readFile, rm, stat, writeFile } from 'node:fs/promises' import { mkdtempSync } from 'node:fs' import { tmpdir } from 'node:os' import { dirname, join } from 'node:path' import type { Message } from 'src/types/message.js' +import { getErrnoCode } from 'src/utils/errors.js' import { compactMailboxMessages, getLastPeerDmSummary, @@ -171,6 +172,17 @@ describe('compactMailboxMessages', () => { expect(compacted).toEqual([]) }) + + test('returns an empty mailbox when all retention lanes are disabled', () => { + const compacted = compactMailboxMessages([message('unread', false)], { + maxMessages: 0, + maxReadMessages: 0, + maxUnreadProtocolMessages: 0, + maxRetainedBytes: 1_000, + }) + + expect(compacted).toEqual([]) + }) }) describe('teammate mailbox retention', () => { @@ -331,6 +343,32 @@ describe('teammate mailbox retention', () => { expect(await readFile(inboxPath, 'utf-8')).toBe('{not-json') }) + test('writeToMailbox rejects when the inbox path is already a directory', async () => { + const inboxPath = getInboxPath('worker', 'alpha') + await mkdir(inboxPath, { recursive: true }) + + const error = await writeToMailbox( + 'worker', + { + from: 'team-lead', + text: 'new', + timestamp: new Date(5).toISOString(), + }, + 'alpha', + ).then( + () => undefined, + err => err, + ) + + const code = getErrnoCode(error) + expect(code).toBeDefined() + if (code === undefined) { + throw new Error('Expected filesystem errno code') + } + expect(['EISDIR', 'EPERM', 'EACCES']).toContain(code) + expect((await stat(inboxPath)).isDirectory()).toBe(true) + }) + test('readMailbox fails closed on corrupt mailbox content', async () => { const inboxPath = getInboxPath('worker', 'alpha') await mkdir(dirname(inboxPath), { recursive: true }) diff --git a/src/utils/__tests__/udsMessaging.test.ts b/src/utils/__tests__/udsMessaging.test.ts index 392daa1ac..bebaa495d 100644 --- a/src/utils/__tests__/udsMessaging.test.ts +++ b/src/utils/__tests__/udsMessaging.test.ts @@ -11,7 +11,7 @@ import { writeFile, } from 'node:fs/promises' import { createHash } from 'node:crypto' -import { createConnection, createServer } from 'node:net' +import { createConnection, createServer, type Socket } from 'node:net' import { dirname, join } from 'node:path' import { tmpdir } from 'node:os' import { @@ -217,6 +217,90 @@ describe('UDS inbox retention', () => { ) }) + test('udsClient send reports connection failures without leaking token state', async () => { + const path = socketPath('uds-client-connect-error') + const capabilityDir = join(tempConfigDir, 'messaging-capabilities') + const capabilityName = `${createHash('sha256').update(path).digest('hex')}.json` + await mkdir(capabilityDir, { recursive: true, mode: 0o700 }) + await writeFile( + join(capabilityDir, capabilityName), + JSON.stringify({ socketPath: path, authToken: 'test-token' }), + 'utf-8', + ) + const { sendToUdsSocket, UdsPeerConnectionError } = await import( + '../udsClient.js' + ) + + const error = await sendToUdsSocket(path, 'hello').then( + () => undefined, + err => err, + ) + expect(error).toBeInstanceOf(UdsPeerConnectionError) + if (!(error instanceof UdsPeerConnectionError)) { + throw new Error('Expected UDS peer connection error') + } + expect(error.socketPath).toBe(path) + expect(error.message).not.toContain('test-token') + }) + + test('udsClient send reports response timeouts as peer connection errors', async () => { + const path = socketPath('uds-client-timeout') + const capabilityDir = join(tempConfigDir, 'messaging-capabilities') + const capabilityName = `${createHash('sha256').update(path).digest('hex')}.json` + await mkdir(capabilityDir, { recursive: true, mode: 0o700 }) + await writeFile( + join(capabilityDir, capabilityName), + JSON.stringify({ socketPath: path, authToken: 'test-token' }), + 'utf-8', + ) + if (process.platform !== 'win32') { + await mkdir(dirname(path), { recursive: true }) + } + + const sockets = new Set() + const receiver = createServer(socket => { + sockets.add(socket) + socket.on('close', () => { + sockets.delete(socket) + }) + socket.on('data', () => undefined) + }) + await new Promise((resolve, reject) => { + receiver.on('error', reject) + receiver.listen(path, () => resolve()) + }) + + try { + const { sendToUdsSocket, UdsPeerConnectionError } = await import( + '../udsClient.js' + ) + + const error = await sendToUdsSocket(path, 'hello', 50).then( + () => undefined, + err => err, + ) + expect(error).toBeInstanceOf(UdsPeerConnectionError) + if (!(error instanceof UdsPeerConnectionError)) { + throw new Error('Expected UDS peer connection timeout error') + } + expect(error.socketPath).toBe(path) + expect(error.cause).toBeInstanceOf(Error) + if (!(error.cause instanceof Error)) { + throw new Error('Expected timeout cause') + } + expect(error.cause.message).toBe('Connection timed out') + expect(error.message).not.toContain('test-token') + } finally { + for (const socket of sockets) { + socket.destroy() + } + await closeServer(receiver) + if (process.platform !== 'win32') { + await unlink(path).catch(() => undefined) + } + } + }) + test('sendUdsMessage fails closed before connecting without an auth token', async () => { await expect( sendUdsMessage(socketPath('no-auth-token'), { type: 'text', data: 'x' }), diff --git a/src/utils/__tests__/udsResponseReader.test.ts b/src/utils/__tests__/udsResponseReader.test.ts index 71203da62..3ec35422e 100644 --- a/src/utils/__tests__/udsResponseReader.test.ts +++ b/src/utils/__tests__/udsResponseReader.test.ts @@ -97,6 +97,28 @@ describe('attachUdsResponseReader', () => { expect(socket.ended).toBe(true) }) + test('continues scanning when blank and valid frames share one chunk', () => { + const socket = new FakeSocket() + let settled = false + let settledError: Error | undefined + + attachUdsResponseReader(asSocket(socket), { + maxFrameBytes: 128, + onSettled: error => { + settled = true + settledError = error + }, + }) + + socket.emitData( + Buffer.from(`\n${JSON.stringify({ type: 'response' })}\n`), + ) + + expect(settled).toBe(true) + expect(settledError).toBeUndefined() + expect(socket.ended).toBe(true) + }) + test('rejects receiver error frames', () => { const socket = new FakeSocket() let settledError: Error | undefined @@ -116,6 +138,31 @@ describe('attachUdsResponseReader', () => { expect(socket.destroyed).toBe(true) }) + test('ignores unrelated receiver frames until a terminal response arrives', () => { + const socket = new FakeSocket() + let settled = false + let settledError: Error | undefined + + attachUdsResponseReader(asSocket(socket), { + maxFrameBytes: 128, + onSettled: error => { + settled = true + settledError = error + }, + }) + + socket.emitData( + Buffer.from( + `${JSON.stringify({ type: 'notification', data: 'queued' })}\n`, + ), + ) + expect(settled).toBe(false) + + socket.emitData(Buffer.from(`${JSON.stringify({ type: 'response' })}\n`)) + expect(settled).toBe(true) + expect(settledError).toBeUndefined() + }) + test('uses custom socket error formatting', () => { const socket = new FakeSocket() let settledError: Error | undefined diff --git a/src/utils/udsClient.ts b/src/utils/udsClient.ts index e33ef3fdb..54d88f7fc 100644 --- a/src/utils/udsClient.ts +++ b/src/utils/udsClient.ts @@ -36,6 +36,19 @@ export type PeerSession = { alive: boolean } +export class UdsPeerConnectionError extends Error { + readonly socketPath: string + + constructor(socketPath: string, cause: unknown) { + super( + `Failed to connect to peer at ${socketPath}: ${errorMessage(cause)}`, + { cause }, + ) + this.name = 'UdsPeerConnectionError' + this.socketPath = socketPath + } +} + // --------------------------------------------------------------------------- // Session directory // --------------------------------------------------------------------------- @@ -193,6 +206,7 @@ export async function isPeerAlive( export async function sendToUdsSocket( targetSocketPath: string, message: string | Record, + timeoutMs = 5000, ): Promise { const { parseUdsTarget } = await import('./udsMessaging.js') const target = parseUdsTarget(targetSocketPath) @@ -237,12 +251,15 @@ export async function sendToUdsSocket( maxFrameBytes: MAX_UDS_FRAME_BYTES, onSettled: finish, formatSocketError: err => - new Error( - `Failed to connect to peer at ${target.socketPath}: ${errorMessage(err)}`, - ), + new UdsPeerConnectionError(target.socketPath, err), }) - conn.setTimeout(5000, () => { - finish(new Error('Connection timed out')) + conn.setTimeout(timeoutMs, () => { + finish( + new UdsPeerConnectionError( + target.socketPath, + new Error('Connection timed out'), + ), + ) }) }) }