diff --git a/src/main/broker.test.ts b/src/main/broker.test.ts index 12140d60..df8e1f7f 100644 --- a/src/main/broker.test.ts +++ b/src/main/broker.test.ts @@ -28,15 +28,25 @@ type MockClient = { unsubscribeChannels: ReturnType getStatus: ReturnType sendMessage: ReturnType + queryEvents: ReturnType brokerPid?: number baseUrl?: string agentNames: string[] + eventHistory: unknown[] + eventListeners: Set<(event: unknown) => void> + emitEvent: (event: unknown) => void } const mock = vi.hoisted(() => { function createMockClient(agentNames: string[] = []): MockClient { const client: MockClient = { agentNames: [...agentNames], + eventHistory: [], + eventListeners: new Set(), + emitEvent: (event: unknown) => { + client.eventHistory.push(event) + for (const listener of client.eventListeners) listener(event) + }, getSession: vi.fn(async () => ({})), listAgents: vi.fn(async () => client.agentNames.map((name) => ({ name, runtime: 'pty', channels: [] }))), getInboundDeliveryMode: vi.fn(async () => 'passthrough'), @@ -52,7 +62,22 @@ const mock = vi.hoisted(() => { agents: client.agentNames.map((name) => ({ name, runtime: 'pty', channels: [] })), pending_delivery_count: 0 })), - onEvent: vi.fn(() => () => undefined), + queryEvents: vi.fn((filter: { kind?: string; name?: string; limit?: number }) => { + const events = client.eventHistory.filter((event) => { + if (!event || typeof event !== 'object') return false + const record = event as Record + if (filter.kind && record.kind !== filter.kind) return false + if (filter.name && record.name !== filter.name) return false + return true + }) + return events.slice(-(filter.limit ?? events.length)) + }), + onEvent: vi.fn((listener: (event: unknown) => void) => { + client.eventListeners.add(listener) + return () => { + client.eventListeners.delete(listener) + } + }), addListener: vi.fn(() => () => undefined), connectEvents: vi.fn(), disconnectEvents: vi.fn(), @@ -62,7 +87,20 @@ const mock = vi.hoisted(() => { release: vi.fn(async () => undefined), subscribeChannels: vi.fn(async () => undefined), unsubscribeChannels: vi.fn(async () => undefined), - sendMessage: vi.fn(async () => ({ event_id: 'evt-message', targets: [] })), + sendMessage: vi.fn(async (input: { to?: string }) => { + const target = input.to || '' + const eventId = `evt-${Math.random().toString(16).slice(2)}` + if (target && !target.startsWith('#')) { + setImmediate(() => { + client.emitEvent({ + kind: 'delivery_injected', + event_id: eventId, + name: target + }) + }) + } + return { event_id: eventId, targets: target && !target.startsWith('#') ? [target] : [] } + }), brokerPid: 4242 } return client @@ -150,6 +188,7 @@ const originalResourcesPathDescriptor = Object.getOwnPropertyDescriptor(process, const originalPlatformDescriptor = Object.getOwnPropertyDescriptor(process, 'platform') const originalPublicEnv = process.env.PUBLIC const originalProgramDataEnv = process.env.ProgramData +const originalPersonaHarnessReadyTimeoutEnv = process.env.PEAR_PERSONA_HARNESS_READY_TIMEOUT_MS function setProcessPlatform(platform: NodeJS.Platform): void { Object.defineProperty(process, 'platform', { @@ -243,6 +282,14 @@ async function writeAgentWorkforceFixture(projectDir: string): Promise { await chmod(winBin, 0o755) } +function emitPersonaHarnessReady(client: MockClient, name: string): void { + client.emitEvent({ + kind: 'worker_stream', + name, + chunk: 'Sandbox mount ready -> /tmp/agentworkforce-session\n' + }) +} + describe('resolveAgentRelayMcpCommand', () => { let tempDir: string | null = null let extraTempDir: string | null = null @@ -366,6 +413,11 @@ describe('BrokerManager local + cloud coexistence', () => { }) afterEach(async () => { + if (originalPersonaHarnessReadyTimeoutEnv === undefined) { + delete process.env.PEAR_PERSONA_HARNESS_READY_TIMEOUT_MS + } else { + process.env.PEAR_PERSONA_HARNESS_READY_TIMEOUT_MS = originalPersonaHarnessReadyTimeoutEnv + } if (personaTempDir) await rm(personaTempDir, { recursive: true, force: true }) personaTempDir = null }) @@ -605,6 +657,7 @@ describe('BrokerManager local + cloud coexistence', () => { const local = lastSpawned() local.spawnPty.mockImplementationOnce(async (input: { name: string }) => { local.agentNames.push(input.name) + setImmediate(() => emitPersonaHarnessReady(local, input.name)) return { name: input.name, runtime: 'pty', @@ -615,6 +668,12 @@ describe('BrokerManager local + cloud coexistence', () => { const result = await manager.spawnPersona(PROJECT_ID, 'autonomous-actor') + expect(local.spawnPty).toHaveBeenCalledWith(expect.objectContaining({ + args: ['agent', 'autonomous-actor'] + })) + expect(local.spawnPty).not.toHaveBeenCalledWith(expect.objectContaining({ + args: expect.arrayContaining(['--install-in-repo']) + })) expect(result).toEqual({ name: 'autonomous-actor', runtime: 'pty', @@ -639,6 +698,7 @@ describe('BrokerManager local + cloud coexistence', () => { releaseSpawn = resolve }) local.agentNames.push(input.name) + setImmediate(() => emitPersonaHarnessReady(local, input.name)) return { name: input.name, runtime: 'pty', @@ -667,6 +727,178 @@ describe('BrokerManager local + cloud coexistence', () => { await manager.shutdown() }) + it('releases a workforce persona wrapper that never reaches harness readiness', async () => { + personaTempDir = await mkdtemp(join(tmpdir(), 'pear-persona-spawn-')) + await writeAgentWorkforceFixture(personaTempDir) + process.env.PEAR_PERSONA_HARNESS_READY_TIMEOUT_MS = '20' + + const manager = new BrokerManager() + mock.state.nextLocalAgents = [] + await manager.start(PROJECT_ID, personaTempDir, 'pear-project-1', undefined as never, []) + const local = lastSpawned() + local.spawnPty.mockImplementationOnce(async (input: { name: string }) => { + local.agentNames.push(input.name) + return { + name: input.name, + runtime: 'pty', + cli: 'agentworkforce' + } + }) + + await expect(manager.spawnPersona(PROJECT_ID, 'autonomous-actor')).rejects.toThrow( + /Timed out waiting for Workforce persona autonomous-actor to prepare its harness/ + ) + expect(local.release).toHaveBeenCalledWith( + 'autonomous-actor', + 'persona harness readiness verification failed' + ) + + delete process.env.PEAR_PERSONA_HARNESS_READY_TIMEOUT_MS + await manager.shutdown() + }) + + it('does not expose a workforce persona to listAgents until harness readiness passes', async () => { + personaTempDir = await mkdtemp(join(tmpdir(), 'pear-persona-spawn-')) + await writeAgentWorkforceFixture(personaTempDir) + + const manager = new BrokerManager() + mock.state.nextLocalAgents = [] + await manager.start(PROJECT_ID, personaTempDir, 'pear-project-1', undefined as never, []) + const local = lastSpawned() + let releaseSpawn!: () => void + local.spawnPty.mockImplementationOnce(async (input: { name: string }) => { + local.agentNames.push(input.name) + await new Promise((resolve) => { + releaseSpawn = resolve + }) + setImmediate(() => emitPersonaHarnessReady(local, input.name)) + return { + name: input.name, + runtime: 'pty', + cli: 'agentworkforce' + } + }) + + const spawned = manager.spawnPersona(PROJECT_ID, 'autonomous-actor') + await Promise.resolve() + await Promise.resolve() + + await expect(manager.listAgents(PROJECT_ID)).resolves.toEqual([]) + releaseSpawn() + await expect(spawned).resolves.toEqual({ + name: 'autonomous-actor', + runtime: 'pty', + cli: 'claude' + }) + expect((await manager.listAgents(PROJECT_ID)).map((agent) => agent.name)).toEqual(['autonomous-actor']) + + await manager.shutdown() + }) + + it('does not expose a workforce persona to broker details until harness readiness passes', async () => { + personaTempDir = await mkdtemp(join(tmpdir(), 'pear-persona-spawn-')) + await writeAgentWorkforceFixture(personaTempDir) + + const manager = new BrokerManager() + mock.state.nextLocalAgents = [] + await manager.start(PROJECT_ID, personaTempDir, 'pear-project-1', undefined as never, []) + const local = lastSpawned() + let releaseSpawn!: () => void + local.spawnPty.mockImplementationOnce(async (input: { name: string }) => { + local.agentNames.push(input.name) + await new Promise((resolve) => { + releaseSpawn = resolve + }) + setImmediate(() => emitPersonaHarnessReady(local, input.name)) + return { + name: input.name, + runtime: 'pty', + cli: 'agentworkforce' + } + }) + + const spawned = manager.spawnPersona(PROJECT_ID, 'autonomous-actor') + await Promise.resolve() + await Promise.resolve() + + const [pendingDetails] = await manager.listBrokerDetails() + expect(pendingDetails.agentCount).toBe(0) + expect(pendingDetails.agents).toEqual([]) + + releaseSpawn() + await expect(spawned).resolves.toEqual({ + name: 'autonomous-actor', + runtime: 'pty', + cli: 'claude' + }) + + const [readyDetails] = await manager.listBrokerDetails() + expect(readyDetails.agentCount).toBe(1) + expect(readyDetails.agents.map((agent) => agent.name)).toEqual(['autonomous-actor']) + + await manager.shutdown() + }) + + it('releases a workforce persona when broker delivery readiness is not confirmed', async () => { + personaTempDir = await mkdtemp(join(tmpdir(), 'pear-persona-spawn-')) + await writeAgentWorkforceFixture(personaTempDir) + process.env.PEAR_PERSONA_HARNESS_READY_TIMEOUT_MS = '50' + + const manager = new BrokerManager() + mock.state.nextLocalAgents = [] + await manager.start(PROJECT_ID, personaTempDir, 'pear-project-1', undefined as never, []) + const local = lastSpawned() + local.spawnPty.mockImplementationOnce(async (input: { name: string }) => { + local.agentNames.push(input.name) + setImmediate(() => emitPersonaHarnessReady(local, input.name)) + return { + name: input.name, + runtime: 'pty', + cli: 'agentworkforce' + } + }) + local.sendMessage.mockResolvedValueOnce({ + event_id: 'evt-readiness-never-injected', + targets: ['autonomous-actor'] + }) + + await expect(manager.spawnPersona(PROJECT_ID, 'autonomous-actor')).rejects.toThrow( + /did not become ready for broker delivery/ + ) + expect(local.release).toHaveBeenCalledWith( + 'autonomous-actor', + 'persona harness readiness verification failed' + ) + + await manager.shutdown() + }) + + it('does not reuse old sandbox-ready output for a new workforce persona launch', async () => { + personaTempDir = await mkdtemp(join(tmpdir(), 'pear-persona-spawn-')) + await writeAgentWorkforceFixture(personaTempDir) + process.env.PEAR_PERSONA_HARNESS_READY_TIMEOUT_MS = '20' + + const manager = new BrokerManager() + mock.state.nextLocalAgents = [] + await manager.start(PROJECT_ID, personaTempDir, 'pear-project-1', undefined as never, []) + const local = lastSpawned() + emitPersonaHarnessReady(local, 'autonomous-actor') + local.spawnPty.mockImplementationOnce(async (input: { name: string }) => { + local.agentNames.push(input.name) + return { + name: input.name, + runtime: 'pty', + cli: 'agentworkforce' + } + }) + + await expect(manager.spawnPersona(PROJECT_ID, 'autonomous-actor')).rejects.toThrow( + /Timed out waiting for Workforce persona autonomous-actor to prepare its harness/ + ) + + await manager.shutdown() + }) + it('spawning with broker: cloud fails clearly when no sandbox is attached', async () => { const manager = new BrokerManager() await startLocal(manager) diff --git a/src/main/broker.ts b/src/main/broker.ts index ba8a1668..5597258b 100644 --- a/src/main/broker.ts +++ b/src/main/broker.ts @@ -365,6 +365,8 @@ const MAX_BROKER_TIMEOUTS_BEFORE_REVIVE = 2 const BROKER_REVIVE_TERM_GRACE_MS = 1_500 const PERSONA_REGISTRATION_TIMEOUT_MS = 5_000 const PERSONA_REGISTRATION_STABILITY_MS = 1_000 +const PERSONA_HARNESS_READY_TIMEOUT_MS = 120_000 +const PERSONA_READY_PROBE_TIMEOUT_MS = 5_000 const AGENTWORKFORCE_CLI_VERSION = '3.0.50' function delay(ms: number): Promise { @@ -406,6 +408,11 @@ type BrokerSpawnResult = { cli?: string } +type PersonaBrokerSpawnResult = BrokerSpawnResult & { + workerStreamBaselineSeq?: number + workerStreamBaselineCount: number +} + function normalizeSpawnPtyResult(value: unknown, fallbackName: string, cli?: string): BrokerSpawnResult { const record = isRecord(value) ? value : {} const name = typeof record.name === 'string' && record.name.trim() @@ -455,6 +462,33 @@ function deliveryFailureMessage(event: BrokerEvent): string { return reason || lastError || 'Broker delivery failed' } +function isWorkerStreamForAgent(event: BrokerEvent, name: string): boolean { + return brokerEventString(event, 'kind') === 'worker_stream' && brokerEventString(event, 'name') === name +} + +function brokerEventChunk(event: BrokerEvent): string { + const value = (event as unknown as Record).chunk + return typeof value === 'string' ? value : '' +} + +function brokerEventNumber(event: BrokerEvent, key: string): number | undefined { + const value = (event as unknown as Record)[key] + return typeof value === 'number' && Number.isFinite(value) ? value : undefined +} + +function personaHarnessReadyFromOutput(output: string): boolean { + return /Sandbox mount ready/i.test(output) +} + +function personaHarnessFailedFromOutput(output: string): string | undefined { + const match = output.match(/(Failed to (?:launch sandbox mount|spawn "[^"]+"(?: inside sandbox mount)?|set up sandbox mount)[^\r\n]*)/i) + return match?.[1] +} + +function tailText(text: string, maxChars = 2_000): string { + return text.length > maxChars ? text.slice(text.length - maxChars) : text +} + interface PearLineageEntry { lineageId: string agentKey: string @@ -1229,6 +1263,7 @@ export class BrokerManager { // Consecutive broker read timeouts per project/operation; after MAX we // respawn the wedged broker. Reset whenever that operation succeeds. private brokerTimeoutCounts = new Map() + private pendingPersonaReadiness = new Set() private eventStreamGenerationCounter = 0 private eventObservers = new Set() private eventHistory: BrokerEventRecord[] = [] @@ -2231,6 +2266,22 @@ export class BrokerManager { return `${sessionKey}:${name}` } + private personaReadinessKey(sessionKey: string, name: string): string { + return `${sessionKey}:${name}` + } + + private markPersonaReadinessPending(session: BrokerSession, name: string): void { + this.pendingPersonaReadiness.add(this.personaReadinessKey(sessionKeyFor(session), name)) + } + + private clearPersonaReadinessPending(session: BrokerSession, name: string): void { + this.pendingPersonaReadiness.delete(this.personaReadinessKey(sessionKeyFor(session), name)) + } + + private isPersonaReadinessPending(session: BrokerSession, name: string): boolean { + return this.pendingPersonaReadiness.has(this.personaReadinessKey(sessionKeyFor(session), name)) + } + // Returns the input stream for an agent plus whether it is *ready* to send on // (the broker has acked pty_input_ready). The WS handshake runs in the // background and is never awaited here, so a keystroke is never blocked on the @@ -2482,9 +2533,9 @@ export class BrokerManager { const persona = findWorkforcePersona(session.cwd, trimmedPersonaId, command) // Resolve the harness from `agentworkforce show --json`. The actual spawn - // is delegated to the workforce CLI (`agent --install-in-repo`), which - // reads the full inherited persona itself, so the broker only needs the - // harness for the informational `cli` field it returns. + // is delegated to the workforce CLI (`agent `), which reads the + // full inherited persona itself, stages skills, materializes sidecars in + // its sandbox mount, and injects relaycast MCP from the broker env. const resolvedHarness = persona.spec.harness ?? 'claude' const spawned = await this.spawnPersonaWithMode(session, { personaId: trimmedPersonaId, @@ -2498,19 +2549,31 @@ export class BrokerManager { projectId: session.projectId, personaId: trimmedPersonaId, name: spawned.name, - mode: 'cli-install-in-repo', + mode: 'cli-sandbox', runtime: registeredAgent.runtime, cli: registeredAgent.cli, currentState: registeredAgent.current_state }) - return spawned + this.markPersonaReadinessPending(session, spawned.name) + try { + await this.verifyPersonaHarnessReady(session, spawned.name, trimmedPersonaId, { + workerStreamBaselineSeq: spawned.workerStreamBaselineSeq, + workerStreamBaselineCount: spawned.workerStreamBaselineCount + }) + } catch (err) { + await this.releaseUnreadyPersona(session, spawned.name, 'persona harness readiness verification failed') + throw err + } finally { + this.clearPersonaReadinessPending(session, spawned.name) + } + return { + name: spawned.name, + runtime: spawned.runtime, + ...(spawned.cli ? { cli: spawned.cli } : {}) + } } - await session.client.release(spawned.name, 'persona broker registration verification failed').catch((err) => { - if (!isMissingAgentError(err)) { - console.warn(`[broker] Failed to release unverified persona agent ${spawned.name}:`, err) - } - }) + await this.releaseUnreadyPersona(session, spawned.name, 'persona broker registration verification failed') throw new Error( `Workforce persona ${trimmedPersonaId} launched but did not stay registered with the broker` ) @@ -2524,11 +2587,11 @@ export class BrokerManager { command: { cli: string; args: string[] } resolvedHarness: string } - ): Promise { + ): Promise { const existingNames = new Set( (await session.client.listAgents()).map((agent) => agent.name) ) - const personaArgs = ['agent', '--install-in-repo', input.personaId] + const personaArgs = ['agent', input.personaId] let nextInput: SpawnPtyInput = { name: getAvailableAgentName(input.baseName, existingNames), cli: input.command.cli, @@ -2539,12 +2602,20 @@ export class BrokerManager { } for (let attempt = 0; attempt < 20; attempt += 1) { + const requestedName = nextInput.name + this.markPersonaReadinessPending(session, requestedName) try { + const baseline = this.workerStreamBaseline(session, requestedName) const spawned = await session.client.spawnPty(nextInput) const safeSpawned = normalizeSpawnPtyResult(spawned, nextInput.name, input.resolvedHarness) + if (safeSpawned.name !== requestedName) { + this.clearPersonaReadinessPending(session, requestedName) + this.markPersonaReadinessPending(session, safeSpawned.name) + } this.rememberAgentSession(safeSpawned.name, sessionKeyFor(session)) - return safeSpawned + return { ...safeSpawned, ...baseline } } catch (err) { + this.clearPersonaReadinessPending(session, requestedName) if (!isAgentNameConflict(err)) { throw err } @@ -2664,6 +2735,170 @@ export class BrokerManager { return stableAgent } + private async releaseUnreadyPersona(session: BrokerSession, name: string, reason: string): Promise { + this.clearPersonaReadinessPending(session, name) + this.forgetAgentSession(name, sessionKeyFor(session)) + await session.client.release(name, reason).catch((err) => { + if (!isMissingAgentError(err)) { + console.warn(`[broker] Failed to release unready persona agent ${name}:`, err) + } + }) + } + + private workerStreamEvents(session: BrokerSession, name: string, limit: number): BrokerEvent[] { + const queryEvents = (session.client as unknown as { + queryEvents?: (filter: { kind: string; name: string; limit: number }) => BrokerEvent[] + }).queryEvents + return typeof queryEvents === 'function' + ? queryEvents.call(session.client, { kind: 'worker_stream', name, limit }) + : [] + } + + private workerStreamBaseline(session: BrokerSession, name: string): { + workerStreamBaselineSeq?: number + workerStreamBaselineCount: number + } { + const events = this.workerStreamEvents(session, name, 1000) + const seqs = events + .map((event) => brokerEventNumber(event, 'seq')) + .filter((seq): seq is number => seq !== undefined) + return { + workerStreamBaselineSeq: seqs.length > 0 ? Math.max(...seqs) : undefined, + workerStreamBaselineCount: events.length + } + } + + private isWorkerStreamAfterBaseline( + event: BrokerEvent, + index: number, + baseline: { workerStreamBaselineSeq?: number; workerStreamBaselineCount: number } + ): boolean { + const seq = brokerEventNumber(event, 'seq') + if (baseline.workerStreamBaselineSeq !== undefined) { + return seq !== undefined && seq > baseline.workerStreamBaselineSeq + } + return index >= baseline.workerStreamBaselineCount + } + + private async verifyPersonaHarnessReady( + session: BrokerSession, + name: string, + personaId: string, + baseline: { workerStreamBaselineSeq?: number; workerStreamBaselineCount: number } + ): Promise { + const timeoutMs = parsePositiveIntegerEnv('PEAR_PERSONA_HARNESS_READY_TIMEOUT_MS', PERSONA_HARNESS_READY_TIMEOUT_MS) + const deadline = Date.now() + timeoutMs + await this.waitForPersonaHarnessReadyOutput(session, name, personaId, deadline, baseline) + await this.waitForPersonaDeliveryReadiness(session, name, personaId, deadline) + console.info('[broker] Workforce persona harness readiness verified', { + projectId: session.projectId, + personaId, + name + }) + } + + private async waitForPersonaHarnessReadyOutput( + session: BrokerSession, + name: string, + personaId: string, + deadline: number, + baseline: { workerStreamBaselineSeq?: number; workerStreamBaselineCount: number } + ): Promise { + let output = '' + let timer: ReturnType | undefined + let settled = false + let resolveReady: (() => void) | undefined + let rejectReady: ((error: Error) => void) | undefined + + const finish = (error?: Error): void => { + if (settled) return + settled = true + if (timer) clearTimeout(timer) + if (error) { + rejectReady?.(error) + } else { + resolveReady?.() + } + } + + const observe = (event: BrokerEvent): void => { + if (!isWorkerStreamForAgent(event, name)) return + output += brokerEventChunk(event) + const failure = personaHarnessFailedFromOutput(output) + if (failure) { + finish(new Error(`Workforce persona ${personaId} failed during harness startup: ${failure}`)) + return + } + if (personaHarnessReadyFromOutput(output)) { + finish() + } + } + + const remainingMs = Math.max(1, deadline - Date.now()) + if (remainingMs <= 1) { + throw new Error(`Timed out waiting for Workforce persona ${personaId} to prepare its harness`) + } + + const readyPromise = new Promise((resolve, reject) => { + resolveReady = resolve + rejectReady = reject + timer = setTimeout(() => { + finish(new Error( + `Timed out waiting for Workforce persona ${personaId} to prepare its harness. Last output:\n${tailText(output) || '(no output)'}` + )) + }, remainingMs) + }) + const unsubscribe = session.client.onEvent((event) => { + observe(event) + }) + + try { + const events = this.workerStreamEvents(session, name, 1000) + for (const [index, event] of events.entries()) { + if (this.isWorkerStreamAfterBaseline(event, index, baseline)) { + observe(event) + if (settled) break + } + } + await readyPromise + } finally { + if (timer) clearTimeout(timer) + unsubscribe() + } + } + + private async waitForPersonaDeliveryReadiness( + session: BrokerSession, + name: string, + personaId: string, + deadline: number + ): Promise { + try { + const confirmation = await this.sendMessageAndWaitForInjected(session.projectId, { + to: name, + from: 'pear', + text: 'Persona launch readiness check. No action required.', + priority: -100, + mode: 'steer', + data: { + kind: 'persona-readiness-check', + system: true, + personaId + } + } as SendMessageInput, { + timeoutMs: Math.min(PERSONA_READY_PROBE_TIMEOUT_MS, Math.max(1, deadline - Date.now())) + }) + if (confirmation.eventId === 'unsupported_operation' || !confirmation.targets.includes(name)) { + throw new Error(`Broker did not confirm delivery injection for ${name}`) + } + return + } catch (err) { + throw new Error( + `Workforce persona ${personaId} launched but did not become ready for broker delivery: ${toErrorMessage(err)}` + ) + } + } + async generateCommitDraft(projectId: string, diff: string): Promise { const session = this.getSessionForProject(projectId) const selectedDiff = diff.trim() @@ -3380,7 +3615,8 @@ export class BrokerManager { inboundDeliveryMode?: InboundDeliveryMode brokerKind?: 'local' | 'cloud' }>> { - const agents = await session.client.listAgents() + const agents = (await session.client.listAgents()) + .filter((agent) => !this.isPersonaReadinessPending(session, agent.name)) // A successful poll means the broker is answering again — clear any wedge // streak so a future timeout starts counting from zero. this.brokerTimeoutCounts.delete(`${session.projectId}:listAgents`) @@ -3496,14 +3732,15 @@ export class BrokerManager { 'Broker status' ) return { - agents: status.agents, + agents: status.agents.filter((agent) => !this.isPersonaReadinessPending(session, agent.name)), pendingDeliveryCount: status.pending_delivery_count, auth: status.auth } } catch (statusErr) { try { return { - agents: await withBrokerDetailsTimeout(session.client.listAgents(), 'Agent list'), + agents: (await withBrokerDetailsTimeout(session.client.listAgents(), 'Agent list')) + .filter((agent) => !this.isPersonaReadinessPending(session, agent.name)), pendingDeliveryCount: 0 } } catch (listErr) { diff --git a/src/main/ipc-handlers.test.ts b/src/main/ipc-handlers.test.ts index 4483d743..a95b83c4 100644 --- a/src/main/ipc-handlers.test.ts +++ b/src/main/ipc-handlers.test.ts @@ -63,6 +63,9 @@ const mock = vi.hoisted(() => { updateHistoricalDownload: vi.fn(), disconnect: vi.fn() }, + integrationEventBridge: { + invalidateProjectAgentCache: vi.fn() + }, proactiveAgentManager: { onEvent: vi.fn() } @@ -118,9 +121,7 @@ vi.mock('./integrations', () => ({ })) vi.mock('./integration-event-bridge', () => ({ getIntegrationEventTelemetrySnapshot: vi.fn(() => ({})), - integrationEventBridge: { - invalidateProjectAgentCache: vi.fn() - } + integrationEventBridge: mock.integrationEventBridge })) vi.mock('./ai-hist', () => ({ aiHistManager: {} @@ -197,3 +198,27 @@ describe('registerIpcHandlers broker:spawn-agent', () => { expect(() => structuredClone(result)).not.toThrow() }) }) + +describe('registerIpcHandlers broker:spawn-persona', () => { + beforeEach(() => { + mock.handlers.clear() + mock.ipcMain.handle.mockClear() + mock.ipcMain.on.mockClear() + mock.brokerManager.spawnPersona.mockReset() + mock.integrationEventBridge.invalidateProjectAgentCache.mockClear() + mock.brokerManager.spawnPersona.mockResolvedValue({ name: 'persona', runtime: 'pty', cli: 'claude' }) + registerIpcHandlers() + }) + + it('invalidates the integration agent cache after persona spawn settles', async () => { + const handler = mock.handlers.get('broker:spawn-persona') + expect(handler).toBeTypeOf('function') + + const result = await handler?.({}, 'project-1', 'autonomous-actor') + + expect(result).toEqual({ name: 'persona', runtime: 'pty', cli: 'claude' }) + expect(mock.brokerManager.spawnPersona).toHaveBeenCalledWith('project-1', 'autonomous-actor') + expect(mock.integrationEventBridge.invalidateProjectAgentCache).toHaveBeenCalledTimes(1) + expect(mock.integrationEventBridge.invalidateProjectAgentCache).toHaveBeenCalledWith('project-1') + }) +}) diff --git a/src/main/ipc-handlers.ts b/src/main/ipc-handlers.ts index 941d5b04..b204e54a 100644 --- a/src/main/ipc-handlers.ts +++ b/src/main/ipc-handlers.ts @@ -270,9 +270,12 @@ export function registerIpcHandlers(): void { }) ipcMain.handle('broker:spawn-persona', async (_, projectId: string, personaId: string) => { - const result = await brokerManager.spawnPersona(projectId, personaId) - integrationEventBridge.invalidateProjectAgentCache(projectId) - return toBrokerSpawnAgentResult(result) + try { + const result = await brokerManager.spawnPersona(projectId, personaId) + return toBrokerSpawnAgentResult(result) + } finally { + integrationEventBridge.invalidateProjectAgentCache(projectId) + } }) ipcMain.handle('broker:attach-terminal', async (_, input: {