Skip to content

Commit 550a4f5

Browse files
kjgbotkjgbotagent-relay-code[bot]
authored
Harden workforce persona spawn readiness (#153)
* Harden workforce persona spawn readiness * chore: apply pr-reviewer fixes for #153 --------- Co-authored-by: kjgbot <kjgbot@agentrelay.dev> Co-authored-by: agent-relay-code[bot] <agent-relay-code[bot]@users.noreply.github.com>
1 parent 6721107 commit 550a4f5

4 files changed

Lines changed: 521 additions & 24 deletions

File tree

src/main/broker.test.ts

Lines changed: 234 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,25 @@ type MockClient = {
2828
unsubscribeChannels: ReturnType<typeof vi.fn>
2929
getStatus: ReturnType<typeof vi.fn>
3030
sendMessage: ReturnType<typeof vi.fn>
31+
queryEvents: ReturnType<typeof vi.fn>
3132
brokerPid?: number
3233
baseUrl?: string
3334
agentNames: string[]
35+
eventHistory: unknown[]
36+
eventListeners: Set<(event: unknown) => void>
37+
emitEvent: (event: unknown) => void
3438
}
3539

3640
const mock = vi.hoisted(() => {
3741
function createMockClient(agentNames: string[] = []): MockClient {
3842
const client: MockClient = {
3943
agentNames: [...agentNames],
44+
eventHistory: [],
45+
eventListeners: new Set(),
46+
emitEvent: (event: unknown) => {
47+
client.eventHistory.push(event)
48+
for (const listener of client.eventListeners) listener(event)
49+
},
4050
getSession: vi.fn(async () => ({})),
4151
listAgents: vi.fn(async () => client.agentNames.map((name) => ({ name, runtime: 'pty', channels: [] }))),
4252
getInboundDeliveryMode: vi.fn(async () => 'passthrough'),
@@ -52,7 +62,22 @@ const mock = vi.hoisted(() => {
5262
agents: client.agentNames.map((name) => ({ name, runtime: 'pty', channels: [] })),
5363
pending_delivery_count: 0
5464
})),
55-
onEvent: vi.fn(() => () => undefined),
65+
queryEvents: vi.fn((filter: { kind?: string; name?: string; limit?: number }) => {
66+
const events = client.eventHistory.filter((event) => {
67+
if (!event || typeof event !== 'object') return false
68+
const record = event as Record<string, unknown>
69+
if (filter.kind && record.kind !== filter.kind) return false
70+
if (filter.name && record.name !== filter.name) return false
71+
return true
72+
})
73+
return events.slice(-(filter.limit ?? events.length))
74+
}),
75+
onEvent: vi.fn((listener: (event: unknown) => void) => {
76+
client.eventListeners.add(listener)
77+
return () => {
78+
client.eventListeners.delete(listener)
79+
}
80+
}),
5681
addListener: vi.fn(() => () => undefined),
5782
connectEvents: vi.fn(),
5883
disconnectEvents: vi.fn(),
@@ -62,7 +87,20 @@ const mock = vi.hoisted(() => {
6287
release: vi.fn(async () => undefined),
6388
subscribeChannels: vi.fn(async () => undefined),
6489
unsubscribeChannels: vi.fn(async () => undefined),
65-
sendMessage: vi.fn(async () => ({ event_id: 'evt-message', targets: [] })),
90+
sendMessage: vi.fn(async (input: { to?: string }) => {
91+
const target = input.to || ''
92+
const eventId = `evt-${Math.random().toString(16).slice(2)}`
93+
if (target && !target.startsWith('#')) {
94+
setImmediate(() => {
95+
client.emitEvent({
96+
kind: 'delivery_injected',
97+
event_id: eventId,
98+
name: target
99+
})
100+
})
101+
}
102+
return { event_id: eventId, targets: target && !target.startsWith('#') ? [target] : [] }
103+
}),
66104
brokerPid: 4242
67105
}
68106
return client
@@ -150,6 +188,7 @@ const originalResourcesPathDescriptor = Object.getOwnPropertyDescriptor(process,
150188
const originalPlatformDescriptor = Object.getOwnPropertyDescriptor(process, 'platform')
151189
const originalPublicEnv = process.env.PUBLIC
152190
const originalProgramDataEnv = process.env.ProgramData
191+
const originalPersonaHarnessReadyTimeoutEnv = process.env.PEAR_PERSONA_HARNESS_READY_TIMEOUT_MS
153192

154193
function setProcessPlatform(platform: NodeJS.Platform): void {
155194
Object.defineProperty(process, 'platform', {
@@ -243,6 +282,14 @@ async function writeAgentWorkforceFixture(projectDir: string): Promise<void> {
243282
await chmod(winBin, 0o755)
244283
}
245284

285+
function emitPersonaHarnessReady(client: MockClient, name: string): void {
286+
client.emitEvent({
287+
kind: 'worker_stream',
288+
name,
289+
chunk: 'Sandbox mount ready -> /tmp/agentworkforce-session\n'
290+
})
291+
}
292+
246293
describe('resolveAgentRelayMcpCommand', () => {
247294
let tempDir: string | null = null
248295
let extraTempDir: string | null = null
@@ -366,6 +413,11 @@ describe('BrokerManager local + cloud coexistence', () => {
366413
})
367414

368415
afterEach(async () => {
416+
if (originalPersonaHarnessReadyTimeoutEnv === undefined) {
417+
delete process.env.PEAR_PERSONA_HARNESS_READY_TIMEOUT_MS
418+
} else {
419+
process.env.PEAR_PERSONA_HARNESS_READY_TIMEOUT_MS = originalPersonaHarnessReadyTimeoutEnv
420+
}
369421
if (personaTempDir) await rm(personaTempDir, { recursive: true, force: true })
370422
personaTempDir = null
371423
})
@@ -605,6 +657,7 @@ describe('BrokerManager local + cloud coexistence', () => {
605657
const local = lastSpawned()
606658
local.spawnPty.mockImplementationOnce(async (input: { name: string }) => {
607659
local.agentNames.push(input.name)
660+
setImmediate(() => emitPersonaHarnessReady(local, input.name))
608661
return {
609662
name: input.name,
610663
runtime: 'pty',
@@ -615,6 +668,12 @@ describe('BrokerManager local + cloud coexistence', () => {
615668

616669
const result = await manager.spawnPersona(PROJECT_ID, 'autonomous-actor')
617670

671+
expect(local.spawnPty).toHaveBeenCalledWith(expect.objectContaining({
672+
args: ['agent', 'autonomous-actor']
673+
}))
674+
expect(local.spawnPty).not.toHaveBeenCalledWith(expect.objectContaining({
675+
args: expect.arrayContaining(['--install-in-repo'])
676+
}))
618677
expect(result).toEqual({
619678
name: 'autonomous-actor',
620679
runtime: 'pty',
@@ -639,6 +698,7 @@ describe('BrokerManager local + cloud coexistence', () => {
639698
releaseSpawn = resolve
640699
})
641700
local.agentNames.push(input.name)
701+
setImmediate(() => emitPersonaHarnessReady(local, input.name))
642702
return {
643703
name: input.name,
644704
runtime: 'pty',
@@ -667,6 +727,178 @@ describe('BrokerManager local + cloud coexistence', () => {
667727
await manager.shutdown()
668728
})
669729

730+
it('releases a workforce persona wrapper that never reaches harness readiness', async () => {
731+
personaTempDir = await mkdtemp(join(tmpdir(), 'pear-persona-spawn-'))
732+
await writeAgentWorkforceFixture(personaTempDir)
733+
process.env.PEAR_PERSONA_HARNESS_READY_TIMEOUT_MS = '20'
734+
735+
const manager = new BrokerManager()
736+
mock.state.nextLocalAgents = []
737+
await manager.start(PROJECT_ID, personaTempDir, 'pear-project-1', undefined as never, [])
738+
const local = lastSpawned()
739+
local.spawnPty.mockImplementationOnce(async (input: { name: string }) => {
740+
local.agentNames.push(input.name)
741+
return {
742+
name: input.name,
743+
runtime: 'pty',
744+
cli: 'agentworkforce'
745+
}
746+
})
747+
748+
await expect(manager.spawnPersona(PROJECT_ID, 'autonomous-actor')).rejects.toThrow(
749+
/Timed out waiting for Workforce persona autonomous-actor to prepare its harness/
750+
)
751+
expect(local.release).toHaveBeenCalledWith(
752+
'autonomous-actor',
753+
'persona harness readiness verification failed'
754+
)
755+
756+
delete process.env.PEAR_PERSONA_HARNESS_READY_TIMEOUT_MS
757+
await manager.shutdown()
758+
})
759+
760+
it('does not expose a workforce persona to listAgents until harness readiness passes', async () => {
761+
personaTempDir = await mkdtemp(join(tmpdir(), 'pear-persona-spawn-'))
762+
await writeAgentWorkforceFixture(personaTempDir)
763+
764+
const manager = new BrokerManager()
765+
mock.state.nextLocalAgents = []
766+
await manager.start(PROJECT_ID, personaTempDir, 'pear-project-1', undefined as never, [])
767+
const local = lastSpawned()
768+
let releaseSpawn!: () => void
769+
local.spawnPty.mockImplementationOnce(async (input: { name: string }) => {
770+
local.agentNames.push(input.name)
771+
await new Promise<void>((resolve) => {
772+
releaseSpawn = resolve
773+
})
774+
setImmediate(() => emitPersonaHarnessReady(local, input.name))
775+
return {
776+
name: input.name,
777+
runtime: 'pty',
778+
cli: 'agentworkforce'
779+
}
780+
})
781+
782+
const spawned = manager.spawnPersona(PROJECT_ID, 'autonomous-actor')
783+
await Promise.resolve()
784+
await Promise.resolve()
785+
786+
await expect(manager.listAgents(PROJECT_ID)).resolves.toEqual([])
787+
releaseSpawn()
788+
await expect(spawned).resolves.toEqual({
789+
name: 'autonomous-actor',
790+
runtime: 'pty',
791+
cli: 'claude'
792+
})
793+
expect((await manager.listAgents(PROJECT_ID)).map((agent) => agent.name)).toEqual(['autonomous-actor'])
794+
795+
await manager.shutdown()
796+
})
797+
798+
it('does not expose a workforce persona to broker details until harness readiness passes', async () => {
799+
personaTempDir = await mkdtemp(join(tmpdir(), 'pear-persona-spawn-'))
800+
await writeAgentWorkforceFixture(personaTempDir)
801+
802+
const manager = new BrokerManager()
803+
mock.state.nextLocalAgents = []
804+
await manager.start(PROJECT_ID, personaTempDir, 'pear-project-1', undefined as never, [])
805+
const local = lastSpawned()
806+
let releaseSpawn!: () => void
807+
local.spawnPty.mockImplementationOnce(async (input: { name: string }) => {
808+
local.agentNames.push(input.name)
809+
await new Promise<void>((resolve) => {
810+
releaseSpawn = resolve
811+
})
812+
setImmediate(() => emitPersonaHarnessReady(local, input.name))
813+
return {
814+
name: input.name,
815+
runtime: 'pty',
816+
cli: 'agentworkforce'
817+
}
818+
})
819+
820+
const spawned = manager.spawnPersona(PROJECT_ID, 'autonomous-actor')
821+
await Promise.resolve()
822+
await Promise.resolve()
823+
824+
const [pendingDetails] = await manager.listBrokerDetails()
825+
expect(pendingDetails.agentCount).toBe(0)
826+
expect(pendingDetails.agents).toEqual([])
827+
828+
releaseSpawn()
829+
await expect(spawned).resolves.toEqual({
830+
name: 'autonomous-actor',
831+
runtime: 'pty',
832+
cli: 'claude'
833+
})
834+
835+
const [readyDetails] = await manager.listBrokerDetails()
836+
expect(readyDetails.agentCount).toBe(1)
837+
expect(readyDetails.agents.map((agent) => agent.name)).toEqual(['autonomous-actor'])
838+
839+
await manager.shutdown()
840+
})
841+
842+
it('releases a workforce persona when broker delivery readiness is not confirmed', async () => {
843+
personaTempDir = await mkdtemp(join(tmpdir(), 'pear-persona-spawn-'))
844+
await writeAgentWorkforceFixture(personaTempDir)
845+
process.env.PEAR_PERSONA_HARNESS_READY_TIMEOUT_MS = '50'
846+
847+
const manager = new BrokerManager()
848+
mock.state.nextLocalAgents = []
849+
await manager.start(PROJECT_ID, personaTempDir, 'pear-project-1', undefined as never, [])
850+
const local = lastSpawned()
851+
local.spawnPty.mockImplementationOnce(async (input: { name: string }) => {
852+
local.agentNames.push(input.name)
853+
setImmediate(() => emitPersonaHarnessReady(local, input.name))
854+
return {
855+
name: input.name,
856+
runtime: 'pty',
857+
cli: 'agentworkforce'
858+
}
859+
})
860+
local.sendMessage.mockResolvedValueOnce({
861+
event_id: 'evt-readiness-never-injected',
862+
targets: ['autonomous-actor']
863+
})
864+
865+
await expect(manager.spawnPersona(PROJECT_ID, 'autonomous-actor')).rejects.toThrow(
866+
/did not become ready for broker delivery/
867+
)
868+
expect(local.release).toHaveBeenCalledWith(
869+
'autonomous-actor',
870+
'persona harness readiness verification failed'
871+
)
872+
873+
await manager.shutdown()
874+
})
875+
876+
it('does not reuse old sandbox-ready output for a new workforce persona launch', async () => {
877+
personaTempDir = await mkdtemp(join(tmpdir(), 'pear-persona-spawn-'))
878+
await writeAgentWorkforceFixture(personaTempDir)
879+
process.env.PEAR_PERSONA_HARNESS_READY_TIMEOUT_MS = '20'
880+
881+
const manager = new BrokerManager()
882+
mock.state.nextLocalAgents = []
883+
await manager.start(PROJECT_ID, personaTempDir, 'pear-project-1', undefined as never, [])
884+
const local = lastSpawned()
885+
emitPersonaHarnessReady(local, 'autonomous-actor')
886+
local.spawnPty.mockImplementationOnce(async (input: { name: string }) => {
887+
local.agentNames.push(input.name)
888+
return {
889+
name: input.name,
890+
runtime: 'pty',
891+
cli: 'agentworkforce'
892+
}
893+
})
894+
895+
await expect(manager.spawnPersona(PROJECT_ID, 'autonomous-actor')).rejects.toThrow(
896+
/Timed out waiting for Workforce persona autonomous-actor to prepare its harness/
897+
)
898+
899+
await manager.shutdown()
900+
})
901+
670902
it('spawning with broker: cloud fails clearly when no sandbox is attached', async () => {
671903
const manager = new BrokerManager()
672904
await startLocal(manager)

0 commit comments

Comments
 (0)