From 8224de31e6ce35909d9410ebc8b84141e4210b21 Mon Sep 17 00:00:00 2001 From: kjgbot Date: Sat, 6 Jun 2026 17:20:56 +0200 Subject: [PATCH 1/2] Return clone-safe persona spawn payloads --- src/main/broker.test.ts | 49 ++++++++++++++++++++++++++++++++++++ src/main/broker.ts | 56 +++++++++++++++++++++++++---------------- 2 files changed, 84 insertions(+), 21 deletions(-) diff --git a/src/main/broker.test.ts b/src/main/broker.test.ts index a2eb3d5b..fa405892 100644 --- a/src/main/broker.test.ts +++ b/src/main/broker.test.ts @@ -565,6 +565,55 @@ describe('BrokerManager local + cloud coexistence', () => { await manager.shutdown() }) + it('returns a clone-safe payload when spawning a workforce persona', async () => { + const tempDir = await mkdtemp(join(tmpdir(), 'pear-persona-spawn-')) + const binaryName = process.platform === 'win32' ? 'agentworkforce.cmd' : 'agentworkforce' + const agentworkforceBin = join(tempDir, 'node_modules', '.bin', binaryName) + await mkdir(dirname(agentworkforceBin), { recursive: true }) + await writeFile( + agentworkforceBin, + [ + '#!/usr/bin/env node', + 'const command = process.argv[2]', + "if (command === 'show') {", + " console.log(JSON.stringify({ spec: { id: 'autonomous-actor', harness: 'claude' } }))", + '} else {', + " console.log(JSON.stringify({ personas: [{ persona: 'autonomous-actor', harness: 'claude' }] }))", + '}' + ].join('\n') + ) + await chmod(agentworkforceBin, 0o755) + + try { + const manager = new BrokerManager() + mock.state.nextLocalAgents = [] + await manager.start(PROJECT_ID, tempDir, 'pear-project-1', undefined as never, []) + const local = lastSpawned() + local.spawnPty.mockImplementationOnce(async (input: { name: string }) => { + local.agentNames.push(input.name) + return { + name: input.name, + runtime: 'pty', + cli: 'agentworkforce', + nonCloneable: () => undefined + } + }) + + const result = await manager.spawnPersona(PROJECT_ID, 'autonomous-actor') + + expect(result).toEqual({ + name: 'autonomous-actor', + runtime: 'pty', + cli: 'claude' + }) + expect(() => structuredClone(result)).not.toThrow() + + await manager.shutdown() + } finally { + await rm(tempDir, { recursive: true, force: true }) + } + }) + it('spawning with broker: cloud fails clearly when no sandbox is attached', async () => { const manager = new BrokerManager() await startLocal(manager) diff --git a/src/main/broker.ts b/src/main/broker.ts index 4c247019..6186736f 100644 --- a/src/main/broker.ts +++ b/src/main/broker.ts @@ -393,6 +393,30 @@ function isRecord(value: unknown): value is Record { return typeof value === 'object' && value !== null } +type BrokerSpawnResult = { + name: string + runtime: string + cli?: string +} + +function normalizeSpawnPtyResult(value: unknown, fallbackName: string, cli?: string): BrokerSpawnResult { + const record = isRecord(value) ? value : {} + const name = typeof record.name === 'string' && record.name.trim() + ? record.name.trim() + : fallbackName + const runtime = typeof record.runtime === 'string' && record.runtime.trim() + ? record.runtime.trim() + : 'pty' + const result: BrokerSpawnResult = { name, runtime } + const resolvedCli = typeof cli === 'string' && cli.trim() + ? cli.trim() + : typeof record.cli === 'string' && record.cli.trim() + ? record.cli.trim() + : undefined + if (resolvedCli) result.cli = resolvedCli + return result +} + function brokerEventString(event: BrokerEvent, key: string): string | undefined { const value = (event as unknown as Record)[key] return typeof value === 'string' ? value : undefined @@ -2306,12 +2330,12 @@ export class BrokerManager { projectId: string, spawnInput: SpawnPtyInput & { broker?: 'local' | 'cloud' }, options: { parentAgentName?: string } = {} - ): Promise<{ name: string; runtime: string }> { + ): Promise { const requestKey = spawnRequestKey(projectId, spawnInput, options) const inFlight = this.inFlightSpawnRequests.get(requestKey) if (inFlight) return inFlight - let promise!: Promise<{ name: string; runtime: string }> + let promise!: Promise promise = this.spawnAgentOnce(projectId, spawnInput, options).finally(() => { if (this.inFlightSpawnRequests.get(requestKey) === promise) { this.inFlightSpawnRequests.delete(requestKey) @@ -2325,7 +2349,7 @@ export class BrokerManager { projectId: string, spawnInput: SpawnPtyInput & { broker?: 'local' | 'cloud' }, options: { parentAgentName?: string } = {} - ): Promise<{ name: string; runtime: string }> { + ): Promise { // `broker` selects which of the project's sessions the agent spawns on. // Default: local-first via getSessionForProject (cloud only when no local // broker is running, preserving the cloud-only flow). @@ -2377,7 +2401,8 @@ export class BrokerManager { ) } const spawned = await session.client.spawnPty(nextInput) - const spawnedName = spawned.name || nextInput.name + const safeSpawned = normalizeSpawnPtyResult(spawned, nextInput.name) + const spawnedName = safeSpawned.name this.rememberAgentSession(spawnedName, sessionKeyFor(session)) const burnInput = { ...nextInput, name: spawnedName } const lineage = session.pearLineage.get(spawnedName) @@ -2393,12 +2418,7 @@ export class BrokerManager { ).catch((err) => { console.warn('[burn-spawn-hook] post-spawn burn stamp failed:', err) }) - return { - name: spawnedName, - runtime: typeof spawned.runtime === 'string' && spawned.runtime.trim() - ? spawned.runtime - : 'pty' - } + return safeSpawned } catch (err) { if (!isAgentNameConflict(err)) { throw buildSpawnFailureError(err, nextInput, session.cloudSandboxId ? 'cloud' : 'local') @@ -2425,7 +2445,7 @@ export class BrokerManager { } } - async spawnPersona(projectId: string, personaId: string): Promise<{ name: string; runtime: string; cli?: string }> { + async spawnPersona(projectId: string, personaId: string): Promise { const session = this.getSessionForProject(projectId) const trimmedPersonaId = personaId.trim() if (!trimmedPersonaId) { @@ -2478,7 +2498,7 @@ export class BrokerManager { command: { cli: string; args: string[] } resolvedHarness: string } - ): Promise<{ name: string; runtime: string; cli?: string }> { + ): Promise { const existingNames = new Set( (await session.client.listAgents()).map((agent) => agent.name) ) @@ -2495,15 +2515,9 @@ export class BrokerManager { for (let attempt = 0; attempt < 20; attempt += 1) { try { const spawned = await session.client.spawnPty(nextInput) - const spawnedName = spawned.name || nextInput.name - this.rememberAgentSession(spawnedName, sessionKeyFor(session)) - return { - name: spawnedName, - runtime: typeof spawned.runtime === 'string' && spawned.runtime.trim() - ? spawned.runtime - : 'pty', - cli: input.resolvedHarness - } + const safeSpawned = normalizeSpawnPtyResult(spawned, nextInput.name, input.resolvedHarness) + this.rememberAgentSession(safeSpawned.name, sessionKeyFor(session)) + return safeSpawned } catch (err) { if (!isAgentNameConflict(err)) { throw err From a0eb70e355bf55da4bc37d907e952af5d0ea2943 Mon Sep 17 00:00:00 2001 From: kjgbot Date: Sun, 7 Jun 2026 08:52:43 +0200 Subject: [PATCH 2/2] test(broker): cover duplicate persona spawn coalescing --- src/main/broker.test.ts | 135 ++++++++++++++++++++++++++++------------ src/main/broker.ts | 27 +++++++- 2 files changed, 119 insertions(+), 43 deletions(-) diff --git a/src/main/broker.test.ts b/src/main/broker.test.ts index fa405892..5d83b639 100644 --- a/src/main/broker.test.ts +++ b/src/main/broker.test.ts @@ -220,6 +220,29 @@ async function attachCloud(manager: BrokerManager, agents: string[] = []): Promi return lastConstructed() } +async function writeAgentWorkforceFixture(projectDir: string): Promise { + const binDir = join(projectDir, 'node_modules', '.bin') + const posixBin = join(binDir, 'agentworkforce') + const winBin = join(binDir, 'agentworkforce.cmd') + const jsBin = join(binDir, 'agentworkforce.js') + const script = [ + 'const command = process.argv[2]', + "if (command === 'show') {", + " console.log(JSON.stringify({ spec: { id: 'autonomous-actor', harness: 'claude' } }))", + '} else {', + " console.log(JSON.stringify({ personas: [{ persona: 'autonomous-actor', harness: 'claude' }] }))", + '}' + ].join('\n') + + await mkdir(binDir, { recursive: true }) + await writeFile(jsBin, script) + await writeFile(posixBin, `#!/usr/bin/env node\n${script}\n`) + await writeFile(winBin, `@echo off\r\nnode "%~dp0agentworkforce.js" %*\r\n`) + await chmod(jsBin, 0o755) + await chmod(posixBin, 0o755) + await chmod(winBin, 0o755) +} + describe('resolveAgentRelayMcpCommand', () => { let tempDir: string | null = null let extraTempDir: string | null = null @@ -328,6 +351,8 @@ describe('resolveAgentRelayMcpCommand', () => { }) describe('BrokerManager local + cloud coexistence', () => { + let personaTempDir: string | null = null + beforeEach(() => { mock.state.spawnedClients.length = 0 mock.state.constructedClients.length = 0 @@ -340,6 +365,11 @@ describe('BrokerManager local + cloud coexistence', () => { mock.HarnessDriverClient.connect.mockClear() }) + afterEach(async () => { + if (personaTempDir) await rm(personaTempDir, { recursive: true, force: true }) + personaTempDir = null + }) + it('keeps the local session alive when a cloud sandbox attaches', async () => { const manager = new BrokerManager() const local = await startLocal(manager, ['local-agent']) @@ -566,52 +596,75 @@ describe('BrokerManager local + cloud coexistence', () => { }) it('returns a clone-safe payload when spawning a workforce persona', async () => { - const tempDir = await mkdtemp(join(tmpdir(), 'pear-persona-spawn-')) - const binaryName = process.platform === 'win32' ? 'agentworkforce.cmd' : 'agentworkforce' - const agentworkforceBin = join(tempDir, 'node_modules', '.bin', binaryName) - await mkdir(dirname(agentworkforceBin), { recursive: true }) - await writeFile( - agentworkforceBin, - [ - '#!/usr/bin/env node', - 'const command = process.argv[2]', - "if (command === 'show') {", - " console.log(JSON.stringify({ spec: { id: 'autonomous-actor', harness: 'claude' } }))", - '} else {', - " console.log(JSON.stringify({ personas: [{ persona: 'autonomous-actor', harness: 'claude' }] }))", - '}' - ].join('\n') - ) - await chmod(agentworkforceBin, 0o755) + personaTempDir = await mkdtemp(join(tmpdir(), 'pear-persona-spawn-')) + await writeAgentWorkforceFixture(personaTempDir) - try { - const manager = new BrokerManager() - mock.state.nextLocalAgents = [] - await manager.start(PROJECT_ID, tempDir, 'pear-project-1', undefined as never, []) - const local = lastSpawned() - local.spawnPty.mockImplementationOnce(async (input: { name: string }) => { - local.agentNames.push(input.name) - return { - name: input.name, - runtime: 'pty', - cli: 'agentworkforce', - nonCloneable: () => undefined - } - }) + const manager = new BrokerManager() + mock.state.nextLocalAgents = [] + await manager.start(PROJECT_ID, personaTempDir, 'pear-project-1', undefined as never, []) + const local = lastSpawned() + local.spawnPty.mockImplementationOnce(async (input: { name: string }) => { + local.agentNames.push(input.name) + return { + name: input.name, + runtime: 'pty', + cli: 'agentworkforce', + nonCloneable: () => undefined + } + }) - const result = await manager.spawnPersona(PROJECT_ID, 'autonomous-actor') + const result = await manager.spawnPersona(PROJECT_ID, 'autonomous-actor') - expect(result).toEqual({ - name: 'autonomous-actor', - runtime: 'pty', - cli: 'claude' + expect(result).toEqual({ + name: 'autonomous-actor', + runtime: 'pty', + cli: 'claude' + }) + expect(() => structuredClone(result)).not.toThrow() + + await manager.shutdown() + }) + + it('coalesces concurrent duplicate workforce persona spawn requests', async () => { + personaTempDir = await mkdtemp(join(tmpdir(), 'pear-persona-spawn-')) + await writeAgentWorkforceFixture(personaTempDir) + + const manager = new BrokerManager() + mock.state.nextLocalAgents = [] + await manager.start(PROJECT_ID, personaTempDir, 'pear-project-1', undefined as never, []) + const local = lastSpawned() + let releaseSpawn!: () => void + local.spawnPty.mockImplementationOnce(async (input: { name: string }) => { + await new Promise((resolve) => { + releaseSpawn = resolve }) - expect(() => structuredClone(result)).not.toThrow() + local.agentNames.push(input.name) + return { + name: input.name, + runtime: 'pty', + cli: 'agentworkforce', + nonCloneable: () => undefined + } + }) - await manager.shutdown() - } finally { - await rm(tempDir, { recursive: true, force: true }) - } + const first = manager.spawnPersona(PROJECT_ID, 'autonomous-actor') + const second = manager.spawnPersona(PROJECT_ID, 'autonomous-actor') + await Promise.resolve() + await Promise.resolve() + + expect(local.spawnPty).toHaveBeenCalledTimes(1) + releaseSpawn() + const results = await Promise.all([first, second]) + + expect(results).toEqual([ + { name: 'autonomous-actor', runtime: 'pty', cli: 'claude' }, + { name: 'autonomous-actor', runtime: 'pty', cli: 'claude' } + ]) + expect(() => structuredClone(results[0])).not.toThrow() + expect(() => structuredClone(results[1])).not.toThrow() + expect(local.agentNames).toEqual(['autonomous-actor']) + + await manager.shutdown() }) it('spawning with broker: cloud fails clearly when no sandbox is attached', async () => { diff --git a/src/main/broker.ts b/src/main/broker.ts index 6186736f..4c584c79 100644 --- a/src/main/broker.ts +++ b/src/main/broker.ts @@ -389,6 +389,13 @@ function spawnRequestKey( }) } +function personaSpawnRequestKey(projectId: string, personaId: string): string { + return JSON.stringify({ + projectId, + personaId + }) +} + function isRecord(value: unknown): value is Record { return typeof value === 'object' && value !== null } @@ -1200,7 +1207,7 @@ export class BrokerManager { private sessions = new Map() private startPromises = new Map>() private revivePromises = new Map>() - private inFlightSpawnRequests = new Map>() + private inFlightSpawnRequests = new Map>() // Which broker sessions (by session key) an agent name is registered on. // Both a project's local and cloud brokers join the same relay workspace, // so agent names are project-unique in practice — the set tracks which @@ -2446,12 +2453,28 @@ export class BrokerManager { } async spawnPersona(projectId: string, personaId: string): Promise { - const session = this.getSessionForProject(projectId) const trimmedPersonaId = personaId.trim() if (!trimmedPersonaId) { throw new Error('Persona id is required') } + const requestKey = personaSpawnRequestKey(projectId, trimmedPersonaId) + const inFlight = this.inFlightSpawnRequests.get(requestKey) + if (inFlight) return inFlight + + let promise!: Promise + promise = this.spawnPersonaOnce(projectId, trimmedPersonaId).finally(() => { + if (this.inFlightSpawnRequests.get(requestKey) === promise) { + this.inFlightSpawnRequests.delete(requestKey) + } + }) + this.inFlightSpawnRequests.set(requestKey, promise) + return promise + } + + private async spawnPersonaOnce(projectId: string, trimmedPersonaId: string): Promise { + const session = this.getSessionForProject(projectId) + const command = resolveAgentWorkforceCommand(session.cwd) const persona = findWorkforcePersona(session.cwd, trimmedPersonaId, command)