Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions src/main/broker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,29 @@ async function attachCloud(manager: BrokerManager, agents: string[] = []): Promi
return lastConstructed()
}

async function writeAgentWorkforceFixture(projectDir: string): Promise<void> {
const binDir = join(projectDir, 'node_modules', '.bin')
const posixBin = join(binDir, 'agentworkforce')
const winBin = join(binDir, 'agentworkforce.cmd')
const jsBin = join(binDir, 'agentworkforce.js')
const script = [
'const command = process.argv[2]',
"if (command === 'show') {",
" console.log(JSON.stringify({ spec: { id: 'autonomous-actor', harness: 'claude' } }))",
'} else {',
" console.log(JSON.stringify({ personas: [{ persona: 'autonomous-actor', harness: 'claude' }] }))",
'}'
].join('\n')

await mkdir(binDir, { recursive: true })
await writeFile(jsBin, script)
await writeFile(posixBin, `#!/usr/bin/env node\n${script}\n`)
await writeFile(winBin, `@echo off\r\nnode "%~dp0agentworkforce.js" %*\r\n`)
await chmod(jsBin, 0o755)
await chmod(posixBin, 0o755)
await chmod(winBin, 0o755)
}

describe('resolveAgentRelayMcpCommand', () => {
let tempDir: string | null = null
let extraTempDir: string | null = null
Expand Down Expand Up @@ -328,6 +351,8 @@ describe('resolveAgentRelayMcpCommand', () => {
})

describe('BrokerManager local + cloud coexistence', () => {
let personaTempDir: string | null = null

beforeEach(() => {
mock.state.spawnedClients.length = 0
mock.state.constructedClients.length = 0
Expand All @@ -340,6 +365,11 @@ describe('BrokerManager local + cloud coexistence', () => {
mock.HarnessDriverClient.connect.mockClear()
})

afterEach(async () => {
if (personaTempDir) await rm(personaTempDir, { recursive: true, force: true })
personaTempDir = null
})

it('keeps the local session alive when a cloud sandbox attaches', async () => {
const manager = new BrokerManager()
const local = await startLocal(manager, ['local-agent'])
Expand Down Expand Up @@ -565,6 +595,78 @@ describe('BrokerManager local + cloud coexistence', () => {
await manager.shutdown()
})

it('returns a clone-safe payload when spawning a workforce persona', async () => {
personaTempDir = await mkdtemp(join(tmpdir(), 'pear-persona-spawn-'))
await writeAgentWorkforceFixture(personaTempDir)

const manager = new BrokerManager()
mock.state.nextLocalAgents = []
await manager.start(PROJECT_ID, personaTempDir, 'pear-project-1', undefined as never, [])
const local = lastSpawned()
local.spawnPty.mockImplementationOnce(async (input: { name: string }) => {
local.agentNames.push(input.name)
return {
name: input.name,
runtime: 'pty',
cli: 'agentworkforce',
nonCloneable: () => undefined
}
})

const result = await manager.spawnPersona(PROJECT_ID, 'autonomous-actor')

expect(result).toEqual({
name: 'autonomous-actor',
runtime: 'pty',
cli: 'claude'
})
expect(() => structuredClone(result)).not.toThrow()

await manager.shutdown()
})

it('coalesces concurrent duplicate workforce persona spawn requests', async () => {
personaTempDir = await mkdtemp(join(tmpdir(), 'pear-persona-spawn-'))
await writeAgentWorkforceFixture(personaTempDir)

const manager = new BrokerManager()
mock.state.nextLocalAgents = []
await manager.start(PROJECT_ID, personaTempDir, 'pear-project-1', undefined as never, [])
const local = lastSpawned()
let releaseSpawn!: () => void
local.spawnPty.mockImplementationOnce(async (input: { name: string }) => {
await new Promise<void>((resolve) => {
releaseSpawn = resolve
})
local.agentNames.push(input.name)
return {
name: input.name,
runtime: 'pty',
cli: 'agentworkforce',
nonCloneable: () => undefined
}
})

const first = manager.spawnPersona(PROJECT_ID, 'autonomous-actor')
const second = manager.spawnPersona(PROJECT_ID, 'autonomous-actor')
await Promise.resolve()
await Promise.resolve()

expect(local.spawnPty).toHaveBeenCalledTimes(1)
releaseSpawn()
const results = await Promise.all([first, second])

expect(results).toEqual([
{ name: 'autonomous-actor', runtime: 'pty', cli: 'claude' },
{ name: 'autonomous-actor', runtime: 'pty', cli: 'claude' }
])
expect(() => structuredClone(results[0])).not.toThrow()
expect(() => structuredClone(results[1])).not.toThrow()
expect(local.agentNames).toEqual(['autonomous-actor'])

await manager.shutdown()
})
Comment thread
coderabbitai[bot] marked this conversation as resolved.

it('spawning with broker: cloud fails clearly when no sandbox is attached', async () => {
const manager = new BrokerManager()
await startLocal(manager)
Expand Down
83 changes: 60 additions & 23 deletions src/main/broker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -389,10 +389,41 @@ function spawnRequestKey(
})
}

function personaSpawnRequestKey(projectId: string, personaId: string): string {
return JSON.stringify({
projectId,
personaId
})
}

function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === 'object' && value !== null
}

type BrokerSpawnResult = {
name: string
runtime: string
cli?: string
}

function normalizeSpawnPtyResult(value: unknown, fallbackName: string, cli?: string): BrokerSpawnResult {
const record = isRecord(value) ? value : {}
const name = typeof record.name === 'string' && record.name.trim()
? record.name.trim()
: fallbackName
const runtime = typeof record.runtime === 'string' && record.runtime.trim()
? record.runtime.trim()
: 'pty'
const result: BrokerSpawnResult = { name, runtime }
const resolvedCli = typeof cli === 'string' && cli.trim()
? cli.trim()
: typeof record.cli === 'string' && record.cli.trim()
? record.cli.trim()
: undefined
if (resolvedCli) result.cli = resolvedCli
return result
}

function brokerEventString(event: BrokerEvent, key: string): string | undefined {
const value = (event as unknown as Record<string, unknown>)[key]
return typeof value === 'string' ? value : undefined
Expand Down Expand Up @@ -1176,7 +1207,7 @@ export class BrokerManager {
private sessions = new Map<string, BrokerSession>()
private startPromises = new Map<string, Promise<boolean | void>>()
private revivePromises = new Map<string, Promise<boolean>>()
private inFlightSpawnRequests = new Map<string, Promise<{ name: string; runtime: string }>>()
private inFlightSpawnRequests = new Map<string, Promise<BrokerSpawnResult>>()
// Which broker sessions (by session key) an agent name is registered on.
// Both a project's local and cloud brokers join the same relay workspace,
// so agent names are project-unique in practice — the set tracks which
Expand Down Expand Up @@ -2306,12 +2337,12 @@ export class BrokerManager {
projectId: string,
spawnInput: SpawnPtyInput & { broker?: 'local' | 'cloud' },
options: { parentAgentName?: string } = {}
): Promise<{ name: string; runtime: string }> {
): Promise<BrokerSpawnResult> {
const requestKey = spawnRequestKey(projectId, spawnInput, options)
const inFlight = this.inFlightSpawnRequests.get(requestKey)
if (inFlight) return inFlight

let promise!: Promise<{ name: string; runtime: string }>
let promise!: Promise<BrokerSpawnResult>
promise = this.spawnAgentOnce(projectId, spawnInput, options).finally(() => {
if (this.inFlightSpawnRequests.get(requestKey) === promise) {
this.inFlightSpawnRequests.delete(requestKey)
Expand All @@ -2325,7 +2356,7 @@ export class BrokerManager {
projectId: string,
spawnInput: SpawnPtyInput & { broker?: 'local' | 'cloud' },
options: { parentAgentName?: string } = {}
): Promise<{ name: string; runtime: string }> {
): Promise<BrokerSpawnResult> {
// `broker` selects which of the project's sessions the agent spawns on.
// Default: local-first via getSessionForProject (cloud only when no local
// broker is running, preserving the cloud-only flow).
Expand Down Expand Up @@ -2377,7 +2408,8 @@ export class BrokerManager {
)
}
const spawned = await session.client.spawnPty(nextInput)
const spawnedName = spawned.name || nextInput.name
const safeSpawned = normalizeSpawnPtyResult(spawned, nextInput.name)
const spawnedName = safeSpawned.name
this.rememberAgentSession(spawnedName, sessionKeyFor(session))
const burnInput = { ...nextInput, name: spawnedName }
const lineage = session.pearLineage.get(spawnedName)
Expand All @@ -2393,12 +2425,7 @@ export class BrokerManager {
).catch((err) => {
console.warn('[burn-spawn-hook] post-spawn burn stamp failed:', err)
})
return {
name: spawnedName,
runtime: typeof spawned.runtime === 'string' && spawned.runtime.trim()
? spawned.runtime
: 'pty'
}
return safeSpawned
} catch (err) {
if (!isAgentNameConflict(err)) {
throw buildSpawnFailureError(err, nextInput, session.cloudSandboxId ? 'cloud' : 'local')
Expand All @@ -2425,13 +2452,29 @@ export class BrokerManager {
}
}

async spawnPersona(projectId: string, personaId: string): Promise<{ name: string; runtime: string; cli?: string }> {
const session = this.getSessionForProject(projectId)
async spawnPersona(projectId: string, personaId: string): Promise<BrokerSpawnResult> {
const trimmedPersonaId = personaId.trim()
if (!trimmedPersonaId) {
throw new Error('Persona id is required')
}

const requestKey = personaSpawnRequestKey(projectId, trimmedPersonaId)
const inFlight = this.inFlightSpawnRequests.get(requestKey)
if (inFlight) return inFlight

let promise!: Promise<BrokerSpawnResult>
promise = this.spawnPersonaOnce(projectId, trimmedPersonaId).finally(() => {
if (this.inFlightSpawnRequests.get(requestKey) === promise) {
this.inFlightSpawnRequests.delete(requestKey)
}
})
this.inFlightSpawnRequests.set(requestKey, promise)
return promise
}

private async spawnPersonaOnce(projectId: string, trimmedPersonaId: string): Promise<BrokerSpawnResult> {
const session = this.getSessionForProject(projectId)

const command = resolveAgentWorkforceCommand(session.cwd)
const persona = findWorkforcePersona(session.cwd, trimmedPersonaId, command)

Expand Down Expand Up @@ -2478,7 +2521,7 @@ export class BrokerManager {
command: { cli: string; args: string[] }
resolvedHarness: string
}
): Promise<{ name: string; runtime: string; cli?: string }> {
): Promise<BrokerSpawnResult> {
const existingNames = new Set(
(await session.client.listAgents()).map((agent) => agent.name)
)
Expand All @@ -2495,15 +2538,9 @@ export class BrokerManager {
for (let attempt = 0; attempt < 20; attempt += 1) {
try {
const spawned = await session.client.spawnPty(nextInput)
const spawnedName = spawned.name || nextInput.name
this.rememberAgentSession(spawnedName, sessionKeyFor(session))
return {
name: spawnedName,
runtime: typeof spawned.runtime === 'string' && spawned.runtime.trim()
? spawned.runtime
: 'pty',
cli: input.resolvedHarness
}
const safeSpawned = normalizeSpawnPtyResult(spawned, nextInput.name, input.resolvedHarness)
this.rememberAgentSession(safeSpawned.name, sessionKeyFor(session))
return safeSpawned
} catch (err) {
if (!isAgentNameConflict(err)) {
throw err
Expand Down
Loading