Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
236 changes: 234 additions & 2 deletions src/main/broker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,25 @@ type MockClient = {
unsubscribeChannels: ReturnType<typeof vi.fn>
getStatus: ReturnType<typeof vi.fn>
sendMessage: ReturnType<typeof vi.fn>
queryEvents: ReturnType<typeof vi.fn>
brokerPid?: number
baseUrl?: string
agentNames: string[]
eventHistory: unknown[]
eventListeners: Set<(event: unknown) => void>
emitEvent: (event: unknown) => void
}

const mock = vi.hoisted(() => {
function createMockClient(agentNames: string[] = []): MockClient {
const client: MockClient = {
agentNames: [...agentNames],
eventHistory: [],
eventListeners: new Set(),
emitEvent: (event: unknown) => {
client.eventHistory.push(event)
for (const listener of client.eventListeners) listener(event)
},
getSession: vi.fn(async () => ({})),
listAgents: vi.fn(async () => client.agentNames.map((name) => ({ name, runtime: 'pty', channels: [] }))),
getInboundDeliveryMode: vi.fn(async () => 'passthrough'),
Expand All @@ -52,7 +62,22 @@ const mock = vi.hoisted(() => {
agents: client.agentNames.map((name) => ({ name, runtime: 'pty', channels: [] })),
pending_delivery_count: 0
})),
onEvent: vi.fn(() => () => undefined),
queryEvents: vi.fn((filter: { kind?: string; name?: string; limit?: number }) => {
const events = client.eventHistory.filter((event) => {
if (!event || typeof event !== 'object') return false
const record = event as Record<string, unknown>
if (filter.kind && record.kind !== filter.kind) return false
if (filter.name && record.name !== filter.name) return false
return true
})
return events.slice(-(filter.limit ?? events.length))
}),
onEvent: vi.fn((listener: (event: unknown) => void) => {
client.eventListeners.add(listener)
return () => {
client.eventListeners.delete(listener)
}
}),
addListener: vi.fn(() => () => undefined),
connectEvents: vi.fn(),
disconnectEvents: vi.fn(),
Expand All @@ -62,7 +87,20 @@ const mock = vi.hoisted(() => {
release: vi.fn(async () => undefined),
subscribeChannels: vi.fn(async () => undefined),
unsubscribeChannels: vi.fn(async () => undefined),
sendMessage: vi.fn(async () => ({ event_id: 'evt-message', targets: [] })),
sendMessage: vi.fn(async (input: { to?: string }) => {
const target = input.to || ''
const eventId = `evt-${Math.random().toString(16).slice(2)}`
if (target && !target.startsWith('#')) {
setImmediate(() => {
client.emitEvent({
kind: 'delivery_injected',
event_id: eventId,
name: target
})
})
}
return { event_id: eventId, targets: target && !target.startsWith('#') ? [target] : [] }
}),
brokerPid: 4242
}
return client
Expand Down Expand Up @@ -150,6 +188,7 @@ const originalResourcesPathDescriptor = Object.getOwnPropertyDescriptor(process,
const originalPlatformDescriptor = Object.getOwnPropertyDescriptor(process, 'platform')
const originalPublicEnv = process.env.PUBLIC
const originalProgramDataEnv = process.env.ProgramData
const originalPersonaHarnessReadyTimeoutEnv = process.env.PEAR_PERSONA_HARNESS_READY_TIMEOUT_MS

function setProcessPlatform(platform: NodeJS.Platform): void {
Object.defineProperty(process, 'platform', {
Expand Down Expand Up @@ -243,6 +282,14 @@ async function writeAgentWorkforceFixture(projectDir: string): Promise<void> {
await chmod(winBin, 0o755)
}

function emitPersonaHarnessReady(client: MockClient, name: string): void {
client.emitEvent({
kind: 'worker_stream',
name,
chunk: 'Sandbox mount ready -> /tmp/agentworkforce-session\n'
})
}

describe('resolveAgentRelayMcpCommand', () => {
let tempDir: string | null = null
let extraTempDir: string | null = null
Expand Down Expand Up @@ -366,6 +413,11 @@ describe('BrokerManager local + cloud coexistence', () => {
})

afterEach(async () => {
if (originalPersonaHarnessReadyTimeoutEnv === undefined) {
delete process.env.PEAR_PERSONA_HARNESS_READY_TIMEOUT_MS
} else {
process.env.PEAR_PERSONA_HARNESS_READY_TIMEOUT_MS = originalPersonaHarnessReadyTimeoutEnv
}
if (personaTempDir) await rm(personaTempDir, { recursive: true, force: true })
personaTempDir = null
})
Expand Down Expand Up @@ -605,6 +657,7 @@ describe('BrokerManager local + cloud coexistence', () => {
const local = lastSpawned()
local.spawnPty.mockImplementationOnce(async (input: { name: string }) => {
local.agentNames.push(input.name)
setImmediate(() => emitPersonaHarnessReady(local, input.name))
return {
name: input.name,
runtime: 'pty',
Expand All @@ -615,6 +668,12 @@ describe('BrokerManager local + cloud coexistence', () => {

const result = await manager.spawnPersona(PROJECT_ID, 'autonomous-actor')

expect(local.spawnPty).toHaveBeenCalledWith(expect.objectContaining({
args: ['agent', 'autonomous-actor']
}))
expect(local.spawnPty).not.toHaveBeenCalledWith(expect.objectContaining({
args: expect.arrayContaining(['--install-in-repo'])
}))
expect(result).toEqual({
name: 'autonomous-actor',
runtime: 'pty',
Expand All @@ -639,6 +698,7 @@ describe('BrokerManager local + cloud coexistence', () => {
releaseSpawn = resolve
})
local.agentNames.push(input.name)
setImmediate(() => emitPersonaHarnessReady(local, input.name))
return {
name: input.name,
runtime: 'pty',
Expand Down Expand Up @@ -667,6 +727,178 @@ describe('BrokerManager local + cloud coexistence', () => {
await manager.shutdown()
})

it('releases a workforce persona wrapper that never reaches harness readiness', async () => {
personaTempDir = await mkdtemp(join(tmpdir(), 'pear-persona-spawn-'))
await writeAgentWorkforceFixture(personaTempDir)
process.env.PEAR_PERSONA_HARNESS_READY_TIMEOUT_MS = '20'

const manager = new BrokerManager()
mock.state.nextLocalAgents = []
await manager.start(PROJECT_ID, personaTempDir, 'pear-project-1', undefined as never, [])
const local = lastSpawned()
local.spawnPty.mockImplementationOnce(async (input: { name: string }) => {
local.agentNames.push(input.name)
return {
name: input.name,
runtime: 'pty',
cli: 'agentworkforce'
}
})

await expect(manager.spawnPersona(PROJECT_ID, 'autonomous-actor')).rejects.toThrow(
/Timed out waiting for Workforce persona autonomous-actor to prepare its harness/
)
expect(local.release).toHaveBeenCalledWith(
'autonomous-actor',
'persona harness readiness verification failed'
)

delete process.env.PEAR_PERSONA_HARNESS_READY_TIMEOUT_MS
await manager.shutdown()
})

it('does not expose a workforce persona to listAgents until harness readiness passes', async () => {
personaTempDir = await mkdtemp(join(tmpdir(), 'pear-persona-spawn-'))
await writeAgentWorkforceFixture(personaTempDir)

const manager = new BrokerManager()
mock.state.nextLocalAgents = []
await manager.start(PROJECT_ID, personaTempDir, 'pear-project-1', undefined as never, [])
const local = lastSpawned()
let releaseSpawn!: () => void
local.spawnPty.mockImplementationOnce(async (input: { name: string }) => {
local.agentNames.push(input.name)
await new Promise<void>((resolve) => {
releaseSpawn = resolve
})
setImmediate(() => emitPersonaHarnessReady(local, input.name))
return {
name: input.name,
runtime: 'pty',
cli: 'agentworkforce'
}
})

const spawned = manager.spawnPersona(PROJECT_ID, 'autonomous-actor')
await Promise.resolve()
await Promise.resolve()

await expect(manager.listAgents(PROJECT_ID)).resolves.toEqual([])
releaseSpawn()
await expect(spawned).resolves.toEqual({
name: 'autonomous-actor',
runtime: 'pty',
cli: 'claude'
})
expect((await manager.listAgents(PROJECT_ID)).map((agent) => agent.name)).toEqual(['autonomous-actor'])

await manager.shutdown()
})

it('does not expose a workforce persona to broker details until harness readiness passes', async () => {
personaTempDir = await mkdtemp(join(tmpdir(), 'pear-persona-spawn-'))
await writeAgentWorkforceFixture(personaTempDir)

const manager = new BrokerManager()
mock.state.nextLocalAgents = []
await manager.start(PROJECT_ID, personaTempDir, 'pear-project-1', undefined as never, [])
const local = lastSpawned()
let releaseSpawn!: () => void
local.spawnPty.mockImplementationOnce(async (input: { name: string }) => {
local.agentNames.push(input.name)
await new Promise<void>((resolve) => {
releaseSpawn = resolve
})
setImmediate(() => emitPersonaHarnessReady(local, input.name))
return {
name: input.name,
runtime: 'pty',
cli: 'agentworkforce'
}
})

const spawned = manager.spawnPersona(PROJECT_ID, 'autonomous-actor')
await Promise.resolve()
await Promise.resolve()

const [pendingDetails] = await manager.listBrokerDetails()
expect(pendingDetails.agentCount).toBe(0)
expect(pendingDetails.agents).toEqual([])

releaseSpawn()
await expect(spawned).resolves.toEqual({
name: 'autonomous-actor',
runtime: 'pty',
cli: 'claude'
})

const [readyDetails] = await manager.listBrokerDetails()
expect(readyDetails.agentCount).toBe(1)
expect(readyDetails.agents.map((agent) => agent.name)).toEqual(['autonomous-actor'])

await manager.shutdown()
})

it('releases a workforce persona when broker delivery readiness is not confirmed', async () => {
personaTempDir = await mkdtemp(join(tmpdir(), 'pear-persona-spawn-'))
await writeAgentWorkforceFixture(personaTempDir)
process.env.PEAR_PERSONA_HARNESS_READY_TIMEOUT_MS = '50'

const manager = new BrokerManager()
mock.state.nextLocalAgents = []
await manager.start(PROJECT_ID, personaTempDir, 'pear-project-1', undefined as never, [])
const local = lastSpawned()
local.spawnPty.mockImplementationOnce(async (input: { name: string }) => {
local.agentNames.push(input.name)
setImmediate(() => emitPersonaHarnessReady(local, input.name))
return {
name: input.name,
runtime: 'pty',
cli: 'agentworkforce'
}
})
local.sendMessage.mockResolvedValueOnce({
event_id: 'evt-readiness-never-injected',
targets: ['autonomous-actor']
})

await expect(manager.spawnPersona(PROJECT_ID, 'autonomous-actor')).rejects.toThrow(
/did not become ready for broker delivery/
)
expect(local.release).toHaveBeenCalledWith(
'autonomous-actor',
'persona harness readiness verification failed'
)

await manager.shutdown()
})

it('does not reuse old sandbox-ready output for a new workforce persona launch', async () => {
personaTempDir = await mkdtemp(join(tmpdir(), 'pear-persona-spawn-'))
await writeAgentWorkforceFixture(personaTempDir)
process.env.PEAR_PERSONA_HARNESS_READY_TIMEOUT_MS = '20'

const manager = new BrokerManager()
mock.state.nextLocalAgents = []
await manager.start(PROJECT_ID, personaTempDir, 'pear-project-1', undefined as never, [])
const local = lastSpawned()
emitPersonaHarnessReady(local, 'autonomous-actor')
local.spawnPty.mockImplementationOnce(async (input: { name: string }) => {
local.agentNames.push(input.name)
return {
name: input.name,
runtime: 'pty',
cli: 'agentworkforce'
}
})

await expect(manager.spawnPersona(PROJECT_ID, 'autonomous-actor')).rejects.toThrow(
/Timed out waiting for Workforce persona autonomous-actor to prepare its harness/
)

await manager.shutdown()
})

it('spawning with broker: cloud fails clearly when no sandbox is attached', async () => {
const manager = new BrokerManager()
await startLocal(manager)
Expand Down
Loading
Loading