Skip to content

Commit a0eb70e

Browse files
author
kjgbot
committed
test(broker): cover duplicate persona spawn coalescing
1 parent 8224de3 commit a0eb70e

2 files changed

Lines changed: 119 additions & 43 deletions

File tree

src/main/broker.test.ts

Lines changed: 94 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,29 @@ async function attachCloud(manager: BrokerManager, agents: string[] = []): Promi
220220
return lastConstructed()
221221
}
222222

223+
async function writeAgentWorkforceFixture(projectDir: string): Promise<void> {
224+
const binDir = join(projectDir, 'node_modules', '.bin')
225+
const posixBin = join(binDir, 'agentworkforce')
226+
const winBin = join(binDir, 'agentworkforce.cmd')
227+
const jsBin = join(binDir, 'agentworkforce.js')
228+
const script = [
229+
'const command = process.argv[2]',
230+
"if (command === 'show') {",
231+
" console.log(JSON.stringify({ spec: { id: 'autonomous-actor', harness: 'claude' } }))",
232+
'} else {',
233+
" console.log(JSON.stringify({ personas: [{ persona: 'autonomous-actor', harness: 'claude' }] }))",
234+
'}'
235+
].join('\n')
236+
237+
await mkdir(binDir, { recursive: true })
238+
await writeFile(jsBin, script)
239+
await writeFile(posixBin, `#!/usr/bin/env node\n${script}\n`)
240+
await writeFile(winBin, `@echo off\r\nnode "%~dp0agentworkforce.js" %*\r\n`)
241+
await chmod(jsBin, 0o755)
242+
await chmod(posixBin, 0o755)
243+
await chmod(winBin, 0o755)
244+
}
245+
223246
describe('resolveAgentRelayMcpCommand', () => {
224247
let tempDir: string | null = null
225248
let extraTempDir: string | null = null
@@ -328,6 +351,8 @@ describe('resolveAgentRelayMcpCommand', () => {
328351
})
329352

330353
describe('BrokerManager local + cloud coexistence', () => {
354+
let personaTempDir: string | null = null
355+
331356
beforeEach(() => {
332357
mock.state.spawnedClients.length = 0
333358
mock.state.constructedClients.length = 0
@@ -340,6 +365,11 @@ describe('BrokerManager local + cloud coexistence', () => {
340365
mock.HarnessDriverClient.connect.mockClear()
341366
})
342367

368+
afterEach(async () => {
369+
if (personaTempDir) await rm(personaTempDir, { recursive: true, force: true })
370+
personaTempDir = null
371+
})
372+
343373
it('keeps the local session alive when a cloud sandbox attaches', async () => {
344374
const manager = new BrokerManager()
345375
const local = await startLocal(manager, ['local-agent'])
@@ -566,52 +596,75 @@ describe('BrokerManager local + cloud coexistence', () => {
566596
})
567597

568598
it('returns a clone-safe payload when spawning a workforce persona', async () => {
569-
const tempDir = await mkdtemp(join(tmpdir(), 'pear-persona-spawn-'))
570-
const binaryName = process.platform === 'win32' ? 'agentworkforce.cmd' : 'agentworkforce'
571-
const agentworkforceBin = join(tempDir, 'node_modules', '.bin', binaryName)
572-
await mkdir(dirname(agentworkforceBin), { recursive: true })
573-
await writeFile(
574-
agentworkforceBin,
575-
[
576-
'#!/usr/bin/env node',
577-
'const command = process.argv[2]',
578-
"if (command === 'show') {",
579-
" console.log(JSON.stringify({ spec: { id: 'autonomous-actor', harness: 'claude' } }))",
580-
'} else {',
581-
" console.log(JSON.stringify({ personas: [{ persona: 'autonomous-actor', harness: 'claude' }] }))",
582-
'}'
583-
].join('\n')
584-
)
585-
await chmod(agentworkforceBin, 0o755)
599+
personaTempDir = await mkdtemp(join(tmpdir(), 'pear-persona-spawn-'))
600+
await writeAgentWorkforceFixture(personaTempDir)
586601

587-
try {
588-
const manager = new BrokerManager()
589-
mock.state.nextLocalAgents = []
590-
await manager.start(PROJECT_ID, tempDir, 'pear-project-1', undefined as never, [])
591-
const local = lastSpawned()
592-
local.spawnPty.mockImplementationOnce(async (input: { name: string }) => {
593-
local.agentNames.push(input.name)
594-
return {
595-
name: input.name,
596-
runtime: 'pty',
597-
cli: 'agentworkforce',
598-
nonCloneable: () => undefined
599-
}
600-
})
602+
const manager = new BrokerManager()
603+
mock.state.nextLocalAgents = []
604+
await manager.start(PROJECT_ID, personaTempDir, 'pear-project-1', undefined as never, [])
605+
const local = lastSpawned()
606+
local.spawnPty.mockImplementationOnce(async (input: { name: string }) => {
607+
local.agentNames.push(input.name)
608+
return {
609+
name: input.name,
610+
runtime: 'pty',
611+
cli: 'agentworkforce',
612+
nonCloneable: () => undefined
613+
}
614+
})
601615

602-
const result = await manager.spawnPersona(PROJECT_ID, 'autonomous-actor')
616+
const result = await manager.spawnPersona(PROJECT_ID, 'autonomous-actor')
603617

604-
expect(result).toEqual({
605-
name: 'autonomous-actor',
606-
runtime: 'pty',
607-
cli: 'claude'
618+
expect(result).toEqual({
619+
name: 'autonomous-actor',
620+
runtime: 'pty',
621+
cli: 'claude'
622+
})
623+
expect(() => structuredClone(result)).not.toThrow()
624+
625+
await manager.shutdown()
626+
})
627+
628+
it('coalesces concurrent duplicate workforce persona spawn requests', async () => {
629+
personaTempDir = await mkdtemp(join(tmpdir(), 'pear-persona-spawn-'))
630+
await writeAgentWorkforceFixture(personaTempDir)
631+
632+
const manager = new BrokerManager()
633+
mock.state.nextLocalAgents = []
634+
await manager.start(PROJECT_ID, personaTempDir, 'pear-project-1', undefined as never, [])
635+
const local = lastSpawned()
636+
let releaseSpawn!: () => void
637+
local.spawnPty.mockImplementationOnce(async (input: { name: string }) => {
638+
await new Promise<void>((resolve) => {
639+
releaseSpawn = resolve
608640
})
609-
expect(() => structuredClone(result)).not.toThrow()
641+
local.agentNames.push(input.name)
642+
return {
643+
name: input.name,
644+
runtime: 'pty',
645+
cli: 'agentworkforce',
646+
nonCloneable: () => undefined
647+
}
648+
})
610649

611-
await manager.shutdown()
612-
} finally {
613-
await rm(tempDir, { recursive: true, force: true })
614-
}
650+
const first = manager.spawnPersona(PROJECT_ID, 'autonomous-actor')
651+
const second = manager.spawnPersona(PROJECT_ID, 'autonomous-actor')
652+
await Promise.resolve()
653+
await Promise.resolve()
654+
655+
expect(local.spawnPty).toHaveBeenCalledTimes(1)
656+
releaseSpawn()
657+
const results = await Promise.all([first, second])
658+
659+
expect(results).toEqual([
660+
{ name: 'autonomous-actor', runtime: 'pty', cli: 'claude' },
661+
{ name: 'autonomous-actor', runtime: 'pty', cli: 'claude' }
662+
])
663+
expect(() => structuredClone(results[0])).not.toThrow()
664+
expect(() => structuredClone(results[1])).not.toThrow()
665+
expect(local.agentNames).toEqual(['autonomous-actor'])
666+
667+
await manager.shutdown()
615668
})
616669

617670
it('spawning with broker: cloud fails clearly when no sandbox is attached', async () => {

src/main/broker.ts

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,13 @@ function spawnRequestKey(
389389
})
390390
}
391391

392+
function personaSpawnRequestKey(projectId: string, personaId: string): string {
393+
return JSON.stringify({
394+
projectId,
395+
personaId
396+
})
397+
}
398+
392399
function isRecord(value: unknown): value is Record<string, unknown> {
393400
return typeof value === 'object' && value !== null
394401
}
@@ -1200,7 +1207,7 @@ export class BrokerManager {
12001207
private sessions = new Map<string, BrokerSession>()
12011208
private startPromises = new Map<string, Promise<boolean | void>>()
12021209
private revivePromises = new Map<string, Promise<boolean>>()
1203-
private inFlightSpawnRequests = new Map<string, Promise<{ name: string; runtime: string }>>()
1210+
private inFlightSpawnRequests = new Map<string, Promise<BrokerSpawnResult>>()
12041211
// Which broker sessions (by session key) an agent name is registered on.
12051212
// Both a project's local and cloud brokers join the same relay workspace,
12061213
// so agent names are project-unique in practice — the set tracks which
@@ -2446,12 +2453,28 @@ export class BrokerManager {
24462453
}
24472454

24482455
async spawnPersona(projectId: string, personaId: string): Promise<BrokerSpawnResult> {
2449-
const session = this.getSessionForProject(projectId)
24502456
const trimmedPersonaId = personaId.trim()
24512457
if (!trimmedPersonaId) {
24522458
throw new Error('Persona id is required')
24532459
}
24542460

2461+
const requestKey = personaSpawnRequestKey(projectId, trimmedPersonaId)
2462+
const inFlight = this.inFlightSpawnRequests.get(requestKey)
2463+
if (inFlight) return inFlight
2464+
2465+
let promise!: Promise<BrokerSpawnResult>
2466+
promise = this.spawnPersonaOnce(projectId, trimmedPersonaId).finally(() => {
2467+
if (this.inFlightSpawnRequests.get(requestKey) === promise) {
2468+
this.inFlightSpawnRequests.delete(requestKey)
2469+
}
2470+
})
2471+
this.inFlightSpawnRequests.set(requestKey, promise)
2472+
return promise
2473+
}
2474+
2475+
private async spawnPersonaOnce(projectId: string, trimmedPersonaId: string): Promise<BrokerSpawnResult> {
2476+
const session = this.getSessionForProject(projectId)
2477+
24552478
const command = resolveAgentWorkforceCommand(session.cwd)
24562479
const persona = findWorkforcePersona(session.cwd, trimmedPersonaId, command)
24572480

0 commit comments

Comments
 (0)