diff --git a/CHANGELOG.md b/CHANGELOG.md index 75a987608..f4373d6f8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Breaking Changes +- `@agent-relay/sdk`: `AgentRelay` events move to a multi-listener registry. Use `relay.addListener('x', handler)` / `removeListener` in place of `relay.onX = handler` — the 13 `on*` fields (`onMessageReceived`, `onMessageSent`, `onAgentSpawned`, `onAgentReleased`, `onAgentExited`, `onAgentReady`, `onWorkerOutput`, `onDeliveryUpdate`, `onAgentExitRequested`, `onAgentIdle`, `onAgentActivityChanged`, `onChannelSubscribed`, `onChannelUnsubscribed`) are removed. +- `@agent-relay/sdk`: `channelSubscribed` / `channelUnsubscribed` handlers receive a single `{ agent, channels }` object instead of positional `(agent, channels)` args. +- `@agent-relay/sdk`: new `beforeAgentSpawn` / `afterAgentSpawn` / `beforeAgentRelease` / `afterAgentRelease` call-site hooks. `beforeAgentSpawn` listeners may return a `SpawnPatch` (shallow-merged in registration order) to mutate the spawn input before the broker POST. - Broker/SDK wire protocol is now version 2 for delivery terminal events and lifecycle event shape changes. - `relay.spawn({ task })` now returns `success: false` and terminates the agent when task delivery fails after retries. - `agent-relay send` now uses the orchestrator identity by default so `agent-relay replies ` can correlate worker DMs. @@ -19,6 +22,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Pass `--from` to `agent-relay send` when a script requires a specific sender identity. - Handle `success: false` from `relay.spawn()` calls that pass `task`; spawns without a task are unchanged. - Set `POSTHOG_PROJECT_KEY` in GitHub Actions repository variables before publishing telemetry-enabled artifacts. +- Update relay event handlers from field-assignment to `addListener`: + + ```ts + // Before + relay.onAgentSpawned = (agent) => log(agent.name); + + // After + const off = relay.addListener('agentSpawned', (agent) => log(agent.name)); + // ...later: off(); // unsubscribe + ``` + + Channel subscribe/unsubscribe handlers receive an object: `({ agent, channels }) => ...`. ### Added @@ -90,11 +105,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [6.3.3] - 2026-05-21 ### Product Perspective + #### User-Impacting Fixes + - Detect opencode api key completion (#934) (#934) ### Technical Perspective + #### Releases + - v6.3.3 --- @@ -102,11 +121,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [6.3.2] - 2026-05-20 ### Product Perspective + #### User-Impacting Fixes + - Stop worker stderr from rendering inside agent xterm (#931) (#931) ### Technical Perspective + #### Releases + - v6.3.2 --- @@ -114,15 +137,20 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [6.3.1] - 2026-05-20 ### Product Perspective + #### User-Impacting Fixes + - Pre-register Claude PTY workers so Relaycast MCP boots fast (#926) ### Technical Perspective + #### Dependencies & Tooling + - Retrigger flaky macOS Rust Tests - Drop change-implying framing from PTY pre-register note #### Releases + - v6.3.1 --- @@ -130,7 +158,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [6.3.0] - 2026-05-20 ### Technical Perspective + #### Releases + - v6.3.0 --- @@ -138,11 +168,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [6.2.8] - 2026-05-20 ### Product Perspective + #### User-Impacting Fixes + - Tighten PTY chrome scrubbing, document idle override, tame stale-state warning (#930) (#930) ### Technical Perspective + #### Releases + - v6.2.8 --- @@ -150,7 +184,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [6.2.7] - 2026-05-20 ### Technical Perspective + #### Releases + - v6.2.7 --- diff --git a/packages/acp-bridge/src/acp-agent.ts b/packages/acp-bridge/src/acp-agent.ts index 7f756c839..ac522052b 100644 --- a/packages/acp-bridge/src/acp-agent.ts +++ b/packages/acp-bridge/src/acp-agent.ts @@ -7,12 +7,7 @@ import { randomUUID } from 'node:crypto'; import * as acp from '@agentclientprotocol/sdk'; import { AgentRelay, type Agent, type Message } from '@agent-relay/sdk'; -import type { - ACPBridgeConfig, - SessionState, - RelayMessage, - BridgePromptResult, -} from './types.js'; +import type { ACPBridgeConfig, SessionState, RelayMessage, BridgePromptResult } from './types.js'; /** * Bounded circular cache for message deduplication. @@ -145,7 +140,7 @@ export class RelayACPAgent implements acp.Agent { private setupRelayHandlers(): void { if (!this.relay) return; - this.relay.onMessageReceived = (msg: Message) => { + this.relay.addListener('messageReceived', (msg: Message) => { this.handleRelayMessage({ id: msg.eventId, from: msg.from, @@ -153,7 +148,7 @@ export class RelayACPAgent implements acp.Agent { thread: msg.threadId, timestamp: Date.now(), }); - }; + }); } /** @@ -408,7 +403,7 @@ export class RelayACPAgent implements acp.Agent { }; } - if (!await this.ensureRelayReady()) { + if (!(await this.ensureRelayReady())) { await this.connection.sessionUpdate({ sessionId, update: { @@ -454,7 +449,7 @@ export class RelayACPAgent implements acp.Agent { // Send "thinking" indicator with target info const targetInfo = hasTargets - ? `Sending to ${targets.map(t => `@${t}`).join(', ')}...\n\n` + ? `Sending to ${targets.map((t) => `@${t}`).join(', ')}...\n\n` : 'Broadcasting to all agents...\n\n'; await this.connection.sessionUpdate({ @@ -503,13 +498,13 @@ export class RelayACPAgent implements acp.Agent { await this.connection.sessionUpdate({ sessionId, update: { - sessionUpdate: 'agent_message_chunk', - content: { - type: 'text', - text: 'Failed to send message to relay agents. Please check the relay broker connection.', - }, + sessionUpdate: 'agent_message_chunk', + content: { + type: 'text', + text: 'Failed to send message to relay agents. Please check the relay broker connection.', }, - }); + }, + }); return { success: false, @@ -731,7 +726,7 @@ export class RelayACPAgent implements acp.Agent { return true; } - if (!await this.ensureRelayReady()) { + if (!(await this.ensureRelayReady())) { await this.sendTextUpdate(sessionId, 'Relay broker is not connected (cannot spawn).'); return true; } @@ -769,7 +764,7 @@ export class RelayACPAgent implements acp.Agent { return true; } - if (!await this.ensureRelayReady()) { + if (!(await this.ensureRelayReady())) { await this.sendTextUpdate(sessionId, 'Relay broker is not connected (cannot release).'); return true; } @@ -794,7 +789,7 @@ export class RelayACPAgent implements acp.Agent { } private async handleListAgentsCommand(sessionId: string): Promise { - if (!await this.ensureRelayReady()) { + if (!(await this.ensureRelayReady())) { await this.sendTextUpdate(sessionId, 'Relay broker is not connected (cannot list agents).'); return true; } diff --git a/packages/openclaw/src/spawn/process.ts b/packages/openclaw/src/spawn/process.ts index 5fc449716..e7aa84e55 100644 --- a/packages/openclaw/src/spawn/process.ts +++ b/packages/openclaw/src/spawn/process.ts @@ -132,7 +132,7 @@ export class ProcessSpawnProvider implements SpawnProvider { }, cwd: workspacePath, stdio: ['pipe', 'pipe', 'pipe'], - }, + } ); gatewayProcess.stderr?.on('data', (data: Buffer) => { @@ -176,14 +176,14 @@ export class ProcessSpawnProvider implements SpawnProvider { cli: 'node', args: [bridgePath], channels, - task: options.systemPrompt - ? `${options.systemPrompt}\n\n${identityTask}` - : identityTask, + task: options.systemPrompt ? `${options.systemPrompt}\n\n${identityTask}` : identityTask, }); - relay.onAgentExited = (agent) => { - process.stderr.write(`[spawn:${options.name}] Agent exited: ${agent.name} code=${agent.exitCode ?? 'none'}\n`); - }; + relay.addListener('agentExited', (agent) => { + process.stderr.write( + `[spawn:${options.name}] Agent exited: ${agent.name} code=${agent.exitCode ?? 'none'}\n` + ); + }); } catch (err) { // If SDK broker spawn fails, clean up gateway and propagate gatewayProcess.kill('SIGTERM'); @@ -191,7 +191,7 @@ export class ProcessSpawnProvider implements SpawnProvider { await relay.shutdown().catch(() => {}); } throw new Error( - `Failed to start broker for "${options.name}": ${err instanceof Error ? err.message : String(err)}`, + `Failed to start broker for "${options.name}": ${err instanceof Error ? err.message : String(err)}` ); } diff --git a/packages/sdk/src/__tests__/agent-activity.test.ts b/packages/sdk/src/__tests__/agent-activity.test.ts index d34fa2187..d43d685c4 100644 --- a/packages/sdk/src/__tests__/agent-activity.test.ts +++ b/packages/sdk/src/__tests__/agent-activity.test.ts @@ -27,13 +27,13 @@ function wireRelay(relay: AgentRelay, client: ReturnType { +describe('AgentRelay agentActivityChanged listener', () => { it('emits active on first delivery event', () => { const relay = new AgentRelay(); const { client, emit } = createMockFacadeClient(); wireRelay(relay, client); const changes: AgentActivityChange[] = []; - relay.onAgentActivityChanged = (change) => changes.push(change); + relay.addListener('agentActivityChanged', (change) => changes.push(change)); emit({ kind: 'delivery_queued', name: 'worker-1', delivery_id: 'd1', event_id: 'e1', timestamp: 1 }); @@ -53,7 +53,7 @@ describe('AgentRelay onAgentActivityChanged', () => { const { client, emit } = createMockFacadeClient(); wireRelay(relay, client); const changes: AgentActivityChange[] = []; - relay.onAgentActivityChanged = (change) => changes.push(change); + relay.addListener('agentActivityChanged', (change) => changes.push(change)); emit({ kind: 'delivery_queued', name: 'worker-1', delivery_id: 'd1', event_id: 'e1', timestamp: 1 }); emit({ kind: 'delivery_injected', name: 'worker-1', delivery_id: 'd1', event_id: 'e1', timestamp: 2 }); @@ -69,7 +69,7 @@ describe('AgentRelay onAgentActivityChanged', () => { const { client, emit } = createMockFacadeClient(); wireRelay(relay, client); const changes: AgentActivityChange[] = []; - relay.onAgentActivityChanged = (change) => changes.push(change); + relay.addListener('agentActivityChanged', (change) => changes.push(change)); emit({ kind: 'delivery_queued', name: 'worker-1', delivery_id: 'd1', event_id: 'e1', timestamp: 1 }); emit({ @@ -94,7 +94,7 @@ describe('AgentRelay onAgentActivityChanged', () => { const { client, emit } = createMockFacadeClient(); wireRelay(relay, client); const changes: AgentActivityChange[] = []; - relay.onAgentActivityChanged = (change) => changes.push(change); + relay.addListener('agentActivityChanged', (change) => changes.push(change)); emit({ kind: 'delivery_queued', name: 'worker-1', delivery_id: 'd1', event_id: 'e1', timestamp: 1 }); emit({ kind: 'agent_idle', name: 'worker-1', idle_secs: 30 }); @@ -113,7 +113,7 @@ describe('AgentRelay onAgentActivityChanged', () => { const { client, emit } = createMockFacadeClient(); wireRelay(relay, client); const changes: AgentActivityChange[] = []; - relay.onAgentActivityChanged = (change) => changes.push(change); + relay.addListener('agentActivityChanged', (change) => changes.push(change)); emit({ kind: 'delivery_queued', name: 'worker-1', delivery_id: 'd1', event_id: 'e1', timestamp: 1 }); emit({ kind: 'agent_idle', name: 'worker-1', idle_secs: 30 }); @@ -142,7 +142,7 @@ describe('AgentRelay onAgentActivityChanged', () => { const { client, emit } = createMockFacadeClient(); wireRelay(relay, client); const changes: AgentActivityChange[] = []; - relay.onAgentActivityChanged = (change) => changes.push(change); + relay.addListener('agentActivityChanged', (change) => changes.push(change)); emit({ kind: 'delivery_queued', name: 'worker-1', delivery_id: 'd1', event_id: 'e1', timestamp: 1 }); emit({ kind: 'delivery_queued', name: 'worker-1', delivery_id: 'd2', event_id: 'e2', timestamp: 2 }); @@ -166,7 +166,7 @@ describe('AgentRelay onAgentActivityChanged', () => { const { client, emit } = createMockFacadeClient(); wireRelay(relay, client); const changes: AgentActivityChange[] = []; - relay.onAgentActivityChanged = (change) => changes.push(change); + relay.addListener('agentActivityChanged', (change) => changes.push(change)); emit({ kind: 'delivery_queued', name: 'worker-1', delivery_id: 'd1', event_id: 'e1', timestamp: 1 }); emit({ kind: 'agent_exited', name: 'worker-1', code: 0, signal: undefined }); diff --git a/packages/sdk/src/__tests__/builder-resume-persistence.test.ts b/packages/sdk/src/__tests__/builder-resume-persistence.test.ts index 032c6e442..ddd9c107e 100644 --- a/packages/sdk/src/__tests__/builder-resume-persistence.test.ts +++ b/packages/sdk/src/__tests__/builder-resume-persistence.test.ts @@ -55,13 +55,7 @@ vi.mock('@relaycast/sdk', () => ({ const mockRelayInstance = { shutdown: vi.fn().mockResolvedValue(undefined), onBrokerStderr: vi.fn().mockReturnValue(() => {}), - onMessageReceived: null as any, - onAgentSpawned: null as any, - onAgentReleased: null as any, - onAgentExited: null as any, - onAgentIdle: null as any, - onWorkerOutput: null as any, - onDeliveryUpdate: null as any, + addListener: vi.fn(() => () => {}), }; vi.mock('../relay.js', () => ({ diff --git a/packages/sdk/src/__tests__/completion-pipeline.test.ts b/packages/sdk/src/__tests__/completion-pipeline.test.ts index 29b170f37..c6bbcd1d8 100644 --- a/packages/sdk/src/__tests__/completion-pipeline.test.ts +++ b/packages/sdk/src/__tests__/completion-pipeline.test.ts @@ -128,24 +128,36 @@ const defaultSpawnPtyImplementation = async ({ name, task }: { name: string; tas : 'STEP_COMPLETE:unknown\n'); queueMicrotask(() => { - if (typeof mockRelayInstance.onWorkerOutput === 'function') { - mockRelayInstance.onWorkerOutput({ name, chunk: output }); - } + emitRelayEvent('workerOutput', { name, chunk: output }); }); return { ...mockAgent, name }; }; +const relayListeners = new Map void>>(); + +function emitRelayEvent(event: string, payload: any) { + const set = relayListeners.get(event); + if (!set) return; + for (const fn of set) fn(payload); +} + const mockRelayInstance = { spawnPty: vi.fn().mockImplementation(defaultSpawnPtyImplementation), human: vi.fn().mockReturnValue(mockHuman), shutdown: vi.fn().mockResolvedValue(undefined), onBrokerStderr: vi.fn().mockReturnValue(() => {}), - onWorkerOutput: null as ((frame: { name: string; chunk: string }) => void) | null, - onMessageReceived: null as any, - onAgentSpawned: null as any, - onAgentExited: null as any, - onAgentIdle: null as any, + addListener: vi.fn((event: string, fn: (...args: any[]) => void) => { + let set = relayListeners.get(event); + if (!set) { + set = new Set(); + relayListeners.set(event, set); + } + set.add(fn); + return () => { + set!.delete(fn); + }; + }), listAgents: vi.fn().mockResolvedValue([]), listAgentsRaw: vi.fn().mockResolvedValue([]), }; @@ -154,7 +166,7 @@ let relayEventCounter = 0; function emitRelayChannelMessage(message: { from: string; to: string; text: string }) { setTimeout(() => { - mockRelayInstance.onMessageReceived?.({ + emitRelayEvent('messageReceived', { eventId: `evt-${++relayEventCounter}`, from: message.from, to: message.to, @@ -304,7 +316,7 @@ describe('Completion Pipeline', () => { mockSpawnOutputs = []; mockAgent.release.mockResolvedValue(undefined); mockRelayInstance.spawnPty.mockImplementation(defaultSpawnPtyImplementation); - mockRelayInstance.onWorkerOutput = null; + relayListeners.clear(); db = makeDb(); runner = new WorkflowRunner({ db, workspaceId: 'ws-test' }); }); @@ -999,9 +1011,7 @@ describe('Completion Pipeline', () => { : 'STEP_COMPLETE:unknown\n'; queueMicrotask(() => { - if (typeof mockRelayInstance.onWorkerOutput === 'function') { - mockRelayInstance.onWorkerOutput({ name, chunk: output }); - } + emitRelayEvent('workerOutput', { name, chunk: output }); }); const agent = { ...mockAgent, name }; diff --git a/packages/sdk/src/__tests__/e2e-owner-review.test.ts b/packages/sdk/src/__tests__/e2e-owner-review.test.ts index bfbaca95f..da47d9c95 100644 --- a/packages/sdk/src/__tests__/e2e-owner-review.test.ts +++ b/packages/sdk/src/__tests__/e2e-owner-review.test.ts @@ -94,25 +94,35 @@ const defaultSpawnPtyImplementation = async ({ name, task }: { name: string; tas ? `STEP_COMPLETE:${stepComplete}\n` : 'STEP_COMPLETE:unknown\n'); - queueMicrotask(() => { - if (typeof mockRelayInstance.onWorkerOutput === 'function') { - mockRelayInstance.onWorkerOutput({ name, chunk: output }); - } - }); + queueMicrotask(() => emitRelayEvent('workerOutput', { name, chunk: output })); return { ...mockAgent, name }; }; +// Listener registry for the AgentRelay mock — the production AgentRelay +// uses addListener('eventName', handler), so the mock captures handlers +// here keyed by event name. Tests fire events via `emitRelayEvent`. +const relayListeners = new Map void>>(); +function emitRelayEvent(event: string, payload: unknown): void { + for (const handler of relayListeners.get(event) ?? []) { + handler(payload); + } +} + const mockRelayInstance = { spawnPty: vi.fn().mockImplementation(defaultSpawnPtyImplementation), human: vi.fn().mockReturnValue(mockHuman), shutdown: vi.fn().mockResolvedValue(undefined), onBrokerStderr: vi.fn().mockReturnValue(() => {}), - onWorkerOutput: null as ((frame: { name: string; chunk: string }) => void) | null, - onMessageReceived: null as any, - onAgentSpawned: null as any, - onAgentExited: null as any, - onAgentIdle: null as any, + addListener: vi.fn((event: string, handler: (...args: unknown[]) => void) => { + let set = relayListeners.get(event); + if (!set) { + set = new Set(); + relayListeners.set(event, set); + } + set.add(handler); + return () => set!.delete(handler); + }), listAgentsRaw: vi.fn().mockResolvedValue([]), }; @@ -214,7 +224,7 @@ describe('PR #511 E2E: Auto Step Owner + Review Gating', () => { mockSpawnOutputs = []; mockAgent.release.mockResolvedValue(undefined); mockRelayInstance.spawnPty.mockImplementation(defaultSpawnPtyImplementation); - mockRelayInstance.onWorkerOutput = null; + relayListeners.clear(); db = makeDb(); runner = new WorkflowRunner({ db, workspaceId: 'ws-test' }); }); @@ -403,11 +413,7 @@ describe('PR #511 E2E: Auto Step Owner + Review Gating', () => { ? `STEP_COMPLETE:${stepComplete}\n` : 'STEP_COMPLETE:unknown\n'; - queueMicrotask(() => { - if (typeof mockRelayInstance.onWorkerOutput === 'function') { - mockRelayInstance.onWorkerOutput({ name, chunk: output }); - } - }); + queueMicrotask(() => emitRelayEvent('workerOutput', { name, chunk: output })); if (!isReview) { return { ...mockAgent, name }; @@ -538,11 +544,7 @@ describe('PR #511 E2E: Auto Step Owner + Review Gating', () => { : 'worker finished\n'; if (output) { - queueMicrotask(() => { - if (typeof mockRelayInstance.onWorkerOutput === 'function') { - mockRelayInstance.onWorkerOutput({ name, chunk: output }); - } - }); + queueMicrotask(() => emitRelayEvent('workerOutput', { name, chunk: output })); } return { @@ -579,11 +581,7 @@ describe('PR #511 E2E: Auto Step Owner + Review Gating', () => { : 'worker finished\n'; if (output) { - queueMicrotask(() => { - if (typeof mockRelayInstance.onWorkerOutput === 'function') { - mockRelayInstance.onWorkerOutput({ name, chunk: output }); - } - }); + queueMicrotask(() => emitRelayEvent('workerOutput', { name, chunk: output })); } return { diff --git a/packages/sdk/src/__tests__/event-bus.test.ts b/packages/sdk/src/__tests__/event-bus.test.ts new file mode 100644 index 000000000..3b582f7ac --- /dev/null +++ b/packages/sdk/src/__tests__/event-bus.test.ts @@ -0,0 +1,161 @@ +import { describe, expect, it, vi } from 'vitest'; + +import { EventBus, type EventMap } from '../event-bus.js'; + +interface Events extends EventMap { + ping: [{ id: number }]; + warm: [string, number]; + silent: []; +} + +describe('EventBus', () => { + it('dispatches to multiple listeners in registration order', async () => { + const bus = new EventBus(); + const calls: number[] = []; + bus.addListener('ping', (p) => { + calls.push(p.id * 10 + 1); + }); + bus.addListener('ping', (p) => { + calls.push(p.id * 10 + 2); + }); + bus.addListener('ping', (p) => { + calls.push(p.id * 10 + 3); + }); + + await bus.emit('ping', { id: 5 }); + + expect(calls).toEqual([51, 52, 53]); + }); + + it('returns an unsubscribe function that removes the handler', async () => { + const bus = new EventBus(); + const a = vi.fn(); + const b = vi.fn(); + const offA = bus.addListener('ping', a); + bus.addListener('ping', b); + expect(bus.listenerCount('ping')).toBe(2); + + offA(); + + expect(bus.listenerCount('ping')).toBe(1); + await bus.emit('ping', { id: 1 }); + expect(a).not.toHaveBeenCalled(); + expect(b).toHaveBeenCalledTimes(1); + }); + + it('removeListener is idempotent', () => { + const bus = new EventBus(); + const a = vi.fn(); + bus.addListener('ping', a); + bus.removeListener('ping', a); + bus.removeListener('ping', a); + expect(bus.listenerCount('ping')).toBe(0); + }); + + it('returned unsubscribe is idempotent and does not clobber later registrations', async () => { + const bus = new EventBus(); + const h1 = vi.fn(); + const h2 = vi.fn(); + const off = bus.addListener('ping', h1); + off(); + bus.addListener('ping', h2); + // The second call to the original unsubscribe must NOT wipe the + // freshly-registered h2 — a stale-closure bug would do exactly that. + off(); + expect(bus.listenerCount('ping')).toBe(1); + await bus.emit('ping', { id: 1 }); + expect(h1).not.toHaveBeenCalled(); + expect(h2).toHaveBeenCalledTimes(1); + }); + + it('drops the event-name entry when the last listener is removed', () => { + const bus = new EventBus(); + const a = vi.fn(); + const off = bus.addListener('ping', a); + expect(bus.listenerCount('ping')).toBe(1); + off(); + expect(bus.listenerCount('ping')).toBe(0); + // Re-add should restart cleanly + bus.addListener('ping', a); + expect(bus.listenerCount('ping')).toBe(1); + }); + + it('awaits async listeners sequentially', async () => { + const bus = new EventBus(); + const order: string[] = []; + bus.addListener('ping', async () => { + await new Promise((r) => setTimeout(r, 10)); + order.push('a'); + }); + bus.addListener('ping', () => { + order.push('b'); + }); + + await bus.emit('ping', { id: 0 }); + + expect(order).toEqual(['a', 'b']); + }); + + it('catches and logs handler errors without aborting the chain', async () => { + const bus = new EventBus(); + const consoleErr = vi.spyOn(console, 'error').mockImplementation(() => {}); + const after = vi.fn(); + bus.addListener('ping', () => { + throw new Error('boom'); + }); + bus.addListener('ping', after); + + await bus.emit('ping', { id: 1 }); + + expect(after).toHaveBeenCalledTimes(1); + expect(consoleErr).toHaveBeenCalled(); + consoleErr.mockRestore(); + }); + + it('listeners() snapshot is safe under concurrent mutation', async () => { + const bus = new EventBus(); + const calls: string[] = []; + // Forward-declared so the handler can self-remove via the unsubscribe. + // eslint-disable-next-line prefer-const + let off!: () => void; + bus.addListener('ping', () => { + calls.push('first'); + // Self-remove from inside a handler — must not affect the current emit pass. + off(); + }); + off = bus.addListener('ping', () => { + calls.push('second'); + }); + + await bus.emit('ping', { id: 0 }); + expect(calls).toEqual(['first', 'second']); + // After the emit, the second handler is gone + await bus.emit('ping', { id: 1 }); + expect(calls).toEqual(['first', 'second', 'first']); + }); + + it('passes through positional args of arbitrary arity', async () => { + const bus = new EventBus(); + const handler = vi.fn(); + bus.addListener('warm', handler); + + await bus.emit('warm', 'hello', 42); + + expect(handler).toHaveBeenCalledWith('hello', 42); + }); + + it('handles a zero-arg event', async () => { + const bus = new EventBus(); + const handler = vi.fn(); + bus.addListener('silent', handler); + + await bus.emit('silent'); + + expect(handler).toHaveBeenCalledWith(); + }); + + it('does not blow up when emit is called with no listeners', async () => { + const bus = new EventBus(); + await expect(bus.emit('ping', { id: 1 })).resolves.toBeUndefined(); + }); +}); diff --git a/packages/sdk/src/__tests__/facade.test.ts b/packages/sdk/src/__tests__/facade.test.ts index 8acb6a83f..81c0f03ca 100644 --- a/packages/sdk/src/__tests__/facade.test.ts +++ b/packages/sdk/src/__tests__/facade.test.ts @@ -60,7 +60,7 @@ test('facade: spawn with initial task delivers task after worker_ready', async ( const suffix = Date.now().toString(36); const relay = makeRelay(bin); const readyNames: string[] = []; - relay.onAgentReady = (agent) => readyNames.push(agent.name); + relay.addListener('agentReady', (agent) => readyNames.push(agent.name)); try { const agent = await relay.spawnPty({ @@ -73,7 +73,7 @@ test('facade: spawn with initial task delivers task after worker_ready', async ( // Wait a bit for worker_ready event to propagate await new Promise((r) => setTimeout(r, 2_000)); - assert.ok(readyNames.includes(agent.name), 'onAgentReady should fire for spawned agent'); + assert.ok(readyNames.includes(agent.name), 'agentReady listener should fire for spawned agent'); await agent.release(); } finally { @@ -81,9 +81,9 @@ test('facade: spawn with initial task delivers task after worker_ready', async ( } }); -// ── onAgentReady ──────────────────────────────────────────────────────────── +// ── agentReady listener ───────────────────────────────────────────────────── -test('facade: onAgentReady fires when worker becomes ready', async (t) => { +test('facade: agentReady listener fires when worker becomes ready', async (t) => { if (!requireRelaycast(t)) return; const bin = requireBinary(t); if (!bin) return; @@ -92,7 +92,7 @@ test('facade: onAgentReady fires when worker becomes ready', async (t) => { const relay = makeRelay(bin); const readyAgents: Agent[] = []; - relay.onAgentReady = (agent) => readyAgents.push(agent); + relay.addListener('agentReady', (agent) => readyAgents.push(agent)); try { const agent = await relay.spawnPty({ @@ -106,7 +106,7 @@ test('facade: onAgentReady fires when worker becomes ready', async (t) => { assert.ok( readyAgents.some((a) => a.name === agent.name), - 'onAgentReady should fire with the correct agent' + 'agentReady listener should fire with the correct agent' ); await agent.release(); @@ -125,7 +125,7 @@ test('facade: broadcast sends to all agents', async (t) => { const suffix = Date.now().toString(36); const relay = makeRelay(bin); const sentMessages: Message[] = []; - relay.onMessageSent = (msg) => sentMessages.push(msg); + relay.addListener('messageSent', (msg) => sentMessages.push(msg)); try { const agent = await relay.spawnPty({ @@ -207,7 +207,7 @@ test('facade: waitForAny respects timeout', async (t) => { // ── exit code ─────────────────────────────────────────────────────────────── -test('facade: onAgentExited populates exitCode and exitSignal', async (t) => { +test('facade: agentExited listener populates exitCode and exitSignal', async (t) => { if (!requireRelaycast(t)) return; const bin = requireBinary(t); if (!bin) return; @@ -215,7 +215,7 @@ test('facade: onAgentExited populates exitCode and exitSignal', async (t) => { const suffix = Date.now().toString(36); const relay = makeRelay(bin); const exitedAgents: Agent[] = []; - relay.onAgentExited = (agent) => exitedAgents.push(agent); + relay.addListener('agentExited', (agent) => exitedAgents.push(agent)); try { const agent = await relay.spawnPty({ diff --git a/packages/sdk/src/__tests__/idle-nudge.test.ts b/packages/sdk/src/__tests__/idle-nudge.test.ts index 8a215f7bc..86b85bedd 100644 --- a/packages/sdk/src/__tests__/idle-nudge.test.ts +++ b/packages/sdk/src/__tests__/idle-nudge.test.ts @@ -87,7 +87,7 @@ vi.mock('../relay.js', () => ({ human: vi.fn().mockReturnValue(mockHuman), shutdown: vi.fn().mockResolvedValue(undefined), onBrokerStderr: vi.fn().mockReturnValue(() => {}), - onWorkerOutput: null, + addListener: vi.fn(() => () => {}), listAgentsRaw: vi.fn().mockResolvedValue([]), })), })); diff --git a/packages/sdk/src/__tests__/lifecycle-hooks.test.ts b/packages/sdk/src/__tests__/lifecycle-hooks.test.ts new file mode 100644 index 000000000..200fd4e32 --- /dev/null +++ b/packages/sdk/src/__tests__/lifecycle-hooks.test.ts @@ -0,0 +1,249 @@ +import { describe, expect, it, vi } from 'vitest'; + +import { AgentRelayClient } from '../client.js'; +import type { + AfterAgentReleaseContext, + AfterAgentSpawnContext, + BeforeAgentReleaseContext, + BeforeAgentSpawnContext, + SpawnPatch, +} from '../lifecycle-hooks.js'; +import type { SpawnPtyInput } from '../types.js'; + +/** + * Build a mock fetch that records every request and returns a 200 JSON + * payload constructed from the request body. For `/api/spawn` POSTs, the + * response is `{ name, runtime: 'pty' }` so the client's spawn methods + * can resolve cleanly without a live broker. + */ +function makeMockFetch( + responses: Array<(req: { method: string; body: unknown; path: string }) => unknown> = [] +) { + const captures: Array<{ path: string; method: string; body: unknown }> = []; + const fetchFn = vi.fn(async (url: string | URL | Request, init?: RequestInit) => { + const u = typeof url === 'string' ? url : url.toString(); + const path = new URL(u).pathname; + const method = (init?.method ?? 'GET').toUpperCase(); + const bodyText = typeof init?.body === 'string' ? init.body : ''; + const body = bodyText ? JSON.parse(bodyText) : undefined; + captures.push({ path, method, body }); + const next = responses.shift(); + const payload = next + ? next({ method, body, path }) + : path === '/api/spawn' + ? { name: (body as { name?: string })?.name ?? 'spawned', runtime: 'pty' } + : path.startsWith('/api/spawned/') + ? { name: decodeURIComponent(path.slice('/api/spawned/'.length).split('/')[0]) } + : {}; + return new Response(JSON.stringify(payload), { + status: 200, + headers: { 'Content-Type': 'application/json' }, + }); + }); + return { fetchFn, captures }; +} + +function makeClient(fetchFn: typeof globalThis.fetch): AgentRelayClient { + return new AgentRelayClient({ baseUrl: 'http://broker.test', apiKey: 'k', fetch: fetchFn }); +} + +describe('AgentRelayClient lifecycle hooks', () => { + it('emits beforeAgentSpawn before the HTTP POST and afterAgentSpawn after', async () => { + const { fetchFn, captures } = makeMockFetch(); + const client = makeClient(fetchFn); + const events: string[] = []; + + client.addListener('beforeAgentSpawn', (ctx) => { + events.push(`before:${ctx.kind}:${ctx.input.name}`); + // Fetch must not have been called yet + expect(fetchFn).not.toHaveBeenCalled(); + }); + client.addListener('afterAgentSpawn', (ctx) => { + events.push(`after:${ctx.result?.name ?? 'no-result'}`); + }); + + const result = await client.spawnPty({ name: 'agent-a', cli: 'claude' }); + + expect(result).toEqual({ name: 'agent-a', runtime: 'pty' }); + expect(events).toEqual(['before:pty:agent-a', 'after:agent-a']); + expect(captures).toHaveLength(1); + expect(captures[0].path).toBe('/api/spawn'); + }); + + it('folds beforeAgentSpawn patches into resolvedInput before POST', async () => { + const { fetchFn, captures } = makeMockFetch(); + const client = makeClient(fetchFn); + + client.addListener('beforeAgentSpawn', () => ({ + args: ['--session-id', 'abc-123'], + })); + client.addListener('beforeAgentSpawn', () => ({ + task: 'override-task', + })); + + const after = vi.fn(); + client.addListener('afterAgentSpawn', after); + + await client.spawnPty({ name: 'agent-b', cli: 'claude', args: ['--initial'] }); + + expect(captures[0].body).toMatchObject({ + name: 'agent-b', + cli: 'claude', + args: ['--session-id', 'abc-123'], // patch replaced the array + task: 'override-task', + }); + expect(after).toHaveBeenCalledTimes(1); + const ctx = after.mock.calls[0][0] as AfterAgentSpawnContext; + expect(ctx.resolvedInput.args).toEqual(['--session-id', 'abc-123']); + expect((ctx.resolvedInput as SpawnPtyInput).task).toBe('override-task'); + }); + + it('merges patches in registration order (later wins)', async () => { + const { fetchFn, captures } = makeMockFetch(); + const client = makeClient(fetchFn); + + client.addListener('beforeAgentSpawn', () => ({ model: 'haiku' })); + client.addListener('beforeAgentSpawn', () => ({ model: 'opus' })); + + await client.spawnPty({ name: 'm', cli: 'claude' }); + + expect(captures[0].body).toMatchObject({ model: 'opus' }); + }); + + it('void return is observe-only — no patch applied', async () => { + const { fetchFn, captures } = makeMockFetch(); + const client = makeClient(fetchFn); + + const observer = vi.fn(() => undefined); + client.addListener('beforeAgentSpawn', observer); + + await client.spawnPty({ name: 'obs', cli: 'claude', args: ['x', 'y'] }); + + expect(observer).toHaveBeenCalledTimes(1); + expect(captures[0].body).toMatchObject({ args: ['x', 'y'] }); + }); + + it('hook errors are caught — spawn still proceeds with the prior resolved input', async () => { + const { fetchFn, captures } = makeMockFetch(); + const client = makeClient(fetchFn); + const consoleErr = vi.spyOn(console, 'error').mockImplementation(() => {}); + + client.addListener('beforeAgentSpawn', () => ({ args: ['--first'] })); + client.addListener('beforeAgentSpawn', () => { + throw new Error('hook went boom'); + }); + client.addListener('beforeAgentSpawn', () => ({ task: 'survived' })); + + await client.spawnPty({ name: 'resilient', cli: 'claude' }); + + expect(captures[0].body).toMatchObject({ + args: ['--first'], + task: 'survived', + }); + expect(consoleErr).toHaveBeenCalled(); + consoleErr.mockRestore(); + }); + + it('afterAgentSpawn fires with error context when the HTTP call rejects', async () => { + const fetchFn = vi.fn(async () => { + return new Response('boom', { status: 500, statusText: 'Internal Error' }); + }); + const client = new AgentRelayClient({ + baseUrl: 'http://broker.test', + apiKey: 'k', + fetch: fetchFn, + }); + + const seen: AfterAgentSpawnContext[] = []; + client.addListener('afterAgentSpawn', (ctx) => { + seen.push(ctx); + }); + + await expect(client.spawnPty({ name: 'fail', cli: 'claude' })).rejects.toThrow(); + expect(seen).toHaveLength(1); + expect(seen[0].error).toBeInstanceOf(Error); + expect(seen[0].result).toBeUndefined(); + expect(seen[0].resolvedInput.name).toBe('fail'); + }); + + it('captures spawnerPid + spawnStartTs on the before context', async () => { + const { fetchFn } = makeMockFetch(); + const client = makeClient(fetchFn); + + let captured: BeforeAgentSpawnContext | undefined; + client.addListener('beforeAgentSpawn', (ctx) => { + captured = ctx; + }); + + await client.spawnPty({ name: 'p', cli: 'claude' }); + + expect(captured).toBeDefined(); + expect(captured!.spawnerPid).toBe(process.pid); + expect(captured!.spawnStartTs).toMatch(/^\d{4}-\d{2}-\d{2}T/); + expect(captured!.baseUrl).toBe('http://broker.test'); + expect(captured!.kind).toBe('pty'); + }); + + it('spawnProvider fires the hooks with kind=provider', async () => { + const { fetchFn } = makeMockFetch(); + const client = makeClient(fetchFn); + const before = vi.fn(); + const after = vi.fn(); + client.addListener('beforeAgentSpawn', before); + client.addListener('afterAgentSpawn', after); + + await client.spawnProvider({ name: 'p', provider: 'claude' }); + + expect(before).toHaveBeenCalledTimes(1); + expect((before.mock.calls[0][0] as BeforeAgentSpawnContext).kind).toBe('provider'); + expect(after).toHaveBeenCalledTimes(1); + expect((after.mock.calls[0][0] as AfterAgentSpawnContext).kind).toBe('provider'); + }); + + it('release fires beforeAgentRelease then afterAgentRelease', async () => { + const { fetchFn } = makeMockFetch(); + const client = makeClient(fetchFn); + const order: string[] = []; + + client.addListener('beforeAgentRelease', (ctx: BeforeAgentReleaseContext) => { + order.push(`before:${ctx.name}:${ctx.reason ?? ''}`); + }); + client.addListener('afterAgentRelease', (ctx: AfterAgentReleaseContext) => { + order.push(`after:${ctx.name}:${ctx.durationMs >= 0 ? 'ok' : 'bad'}`); + }); + + await client.release('agent-x', 'cleanup'); + + expect(order).toEqual(['before:agent-x:cleanup', 'after:agent-x:ok']); + }); + + it('removeListener stops further deliveries', async () => { + const { fetchFn } = makeMockFetch(); + const client = makeClient(fetchFn); + const fn = vi.fn(); + client.addListener('beforeAgentSpawn', fn); + await client.spawnPty({ name: 'a', cli: 'claude' }); + expect(fn).toHaveBeenCalledTimes(1); + + client.removeListener('beforeAgentSpawn', fn); + await client.spawnPty({ name: 'b', cli: 'claude' }); + expect(fn).toHaveBeenCalledTimes(1); + }); + + it('patch shape: extending args requires explicit spread', async () => { + // Documents the array-replace contract: a patch's `args` overrides + // the previous `args` outright; handlers must spread to extend. + const { fetchFn, captures } = makeMockFetch(); + const client = makeClient(fetchFn); + client.addListener( + 'beforeAgentSpawn', + (ctx): SpawnPatch => ({ + args: [...(ctx.input.args ?? []), '--extra'], + }) + ); + + await client.spawnPty({ name: 'a', cli: 'claude', args: ['--orig'] }); + + expect(captures[0].body).toMatchObject({ args: ['--orig', '--extra'] }); + }); +}); diff --git a/packages/sdk/src/__tests__/orchestration-upgrades.test.ts b/packages/sdk/src/__tests__/orchestration-upgrades.test.ts index 90ce9a17d..c9e926115 100644 --- a/packages/sdk/src/__tests__/orchestration-upgrades.test.ts +++ b/packages/sdk/src/__tests__/orchestration-upgrades.test.ts @@ -1230,9 +1230,9 @@ describe('AgentRelay orchestration handles', () => { const relay = new AgentRelay(); const seenFrom: string[] = []; - relay.onMessageReceived = (message) => { + relay.addListener('messageReceived', (message) => { seenFrom.push(message.from); - }; + }); try { await relay.listAgents(); // Ensure event wiring is initialized. @@ -1422,7 +1422,7 @@ describe('Agent.status computed getter', () => { const relay = new AgentRelay(); const exitedReasons: Array = []; - relay.onAgentExited = (agent) => exitedReasons.push(agent.exitReason); + relay.addListener('agentExited', (agent) => exitedReasons.push(agent.exitReason)); try { await relay.spawnPty({ name: 'reason-exited', diff --git a/packages/sdk/src/__tests__/quickstart.test.ts b/packages/sdk/src/__tests__/quickstart.test.ts index 98aae47bf..0431ba359 100644 --- a/packages/sdk/src/__tests__/quickstart.test.ts +++ b/packages/sdk/src/__tests__/quickstart.test.ts @@ -61,10 +61,10 @@ test('facade: spawn → message → list → release → shutdown', async (t) => const receivedMessages: Message[] = []; const sentMessages: Message[] = []; - relay.onAgentSpawned = (agent) => spawnedNames.push(agent.name); - relay.onAgentReleased = (agent) => releasedNames.push(agent.name); - relay.onMessageReceived = (msg) => receivedMessages.push(msg); - relay.onMessageSent = (msg) => sentMessages.push(msg); + relay.addListener('agentSpawned', (agent) => spawnedNames.push(agent.name)); + relay.addListener('agentReleased', (agent) => releasedNames.push(agent.name)); + relay.addListener('messageReceived', (msg) => receivedMessages.push(msg)); + relay.addListener('messageSent', (msg) => sentMessages.push(msg)); try { // Spawn two agents in parallel @@ -90,7 +90,7 @@ test('facade: spawn → message → list → release → shutdown', async (t) => assert.equal(msg.from, 'System'); assert.equal(msg.to, codex.name); - // onMessageSent should have fired + // messageSent listener should have fired assert.equal(sentMessages.length, 1); assert.equal(sentMessages[0].text, 'Hello, world!'); @@ -138,7 +138,7 @@ test('facade: agent.sendMessage sends from the agent identity', async (t) => { }); const sentMessages: Message[] = []; - relay.onMessageSent = (msg) => sentMessages.push(msg); + relay.addListener('messageSent', (msg) => sentMessages.push(msg)); try { const [a, b] = await Promise.all([ diff --git a/packages/sdk/src/__tests__/relay-channel-ops.test.ts b/packages/sdk/src/__tests__/relay-channel-ops.test.ts index a3a3cf1c7..06e9009d9 100644 --- a/packages/sdk/src/__tests__/relay-channel-ops.test.ts +++ b/packages/sdk/src/__tests__/relay-channel-ops.test.ts @@ -94,10 +94,10 @@ describe('AgentRelay channel operations', () => { expect(agent.mutedChannels).toEqual([]); }); - it('onChannelSubscribed fires on channel_subscribed events', () => { + it('channelSubscribed listener fires on channel_subscribed events', () => { const { relay, emit } = setupRelay(); const callback = vi.fn(); - relay.onChannelSubscribed = callback; + relay.addListener('channelSubscribed', callback); emit({ kind: 'channel_subscribed', @@ -105,7 +105,7 @@ describe('AgentRelay channel operations', () => { channels: ['ch-a'], }); - expect(callback).toHaveBeenCalledWith('worker-1', ['ch-a']); + expect(callback).toHaveBeenCalledWith({ agent: 'worker-1', channels: ['ch-a'] }); }); // TODO(sdk-test-fix): restore when channel mute/unmute exists end-to-end in the broker and SDK. diff --git a/packages/sdk/src/__tests__/resume-fallback.test.ts b/packages/sdk/src/__tests__/resume-fallback.test.ts index 5f4e4f147..dd02769cd 100644 --- a/packages/sdk/src/__tests__/resume-fallback.test.ts +++ b/packages/sdk/src/__tests__/resume-fallback.test.ts @@ -4,13 +4,7 @@ */ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; -import { - chmodSync, - mkdirSync, - mkdtempSync, - rmSync, - writeFileSync, -} from 'node:fs'; +import { chmodSync, mkdirSync, mkdtempSync, rmSync, writeFileSync } from 'node:fs'; import os from 'node:os'; import path from 'node:path'; import type { WorkflowDb } from '../workflows/runner.js'; @@ -65,8 +59,12 @@ let waitForExitFn: (ms?: number) => Promise<'exited' | 'timeout' | 'released'>; const mockAgent = { name: 'test-agent-abc', - get waitForExit() { return waitForExitFn; }, - get waitForIdle() { return vi.fn().mockImplementation(() => new Promise(() => {})); }, + get waitForExit() { + return waitForExitFn; + }, + get waitForIdle() { + return vi.fn().mockImplementation(() => new Promise(() => {})); + }, release: vi.fn().mockResolvedValue(undefined), }; @@ -75,6 +73,12 @@ const mockHuman = { sendMessage: vi.fn().mockResolvedValue(undefined), }; +const mockListeners = new Map void>>(); +function emitMockEvent(event: string, ...args: any[]): void { + const set = mockListeners.get(event); + if (set) for (const cb of set) cb(...args); +} + const mockRelayInstance = { spawnPty: vi.fn().mockImplementation(async ({ name, task }: { name: string; task?: string }) => { const stepComplete = task?.match(/STEP_COMPLETE:([^\n]+)/)?.[1]?.trim(); @@ -86,9 +90,7 @@ const mockRelayInstance = { : 'STEP_COMPLETE:unknown\n'; queueMicrotask(() => { - if (typeof mockRelayInstance.onWorkerOutput === 'function') { - mockRelayInstance.onWorkerOutput({ name, chunk: output }); - } + emitMockEvent('workerOutput', { name, chunk: output }); }); return { ...mockAgent, name }; @@ -96,13 +98,15 @@ const mockRelayInstance = { human: vi.fn().mockReturnValue(mockHuman), shutdown: vi.fn().mockResolvedValue(undefined), onBrokerStderr: vi.fn().mockReturnValue(() => {}), - onWorkerOutput: null as ((frame: { name: string; chunk: string }) => void) | null, - onMessageReceived: null as any, - onAgentSpawned: null as any, - onAgentReleased: null as any, - onAgentExited: null as any, - onAgentIdle: null as any, - onDeliveryUpdate: null as any, + addListener: vi.fn((event: string, cb: (...args: any[]) => void) => { + let set = mockListeners.get(event); + if (!set) { + set = new Set(); + mockListeners.set(event, set); + } + set.add(cb); + return () => set!.delete(cb); + }), listAgentsRaw: vi.fn().mockResolvedValue([]), }; @@ -150,9 +154,7 @@ function makeResumeConfig(): RelayYamlConfig { version: '1', name: 'test-resume-fallback', swarm: { pattern: 'dag' }, - agents: [ - { name: 'agent-a', cli: 'claude' }, - ], + agents: [{ name: 'agent-a', cli: 'claude' }], workflows: [ { name: 'default', @@ -172,9 +174,7 @@ function makeTemplateConfig(): RelayYamlConfig { version: '1', name: 'test-resume-template', swarm: { pattern: 'dag' }, - agents: [ - { name: 'agent-a', cli: 'claude' }, - ], + agents: [{ name: 'agent-a', cli: 'claude' }], workflows: [ { name: 'default', @@ -193,7 +193,11 @@ function makeTemplateConfig(): RelayYamlConfig { }; } -function makeRunRow(runId: string, config: RelayYamlConfig, status: WorkflowRunRow['status'] = 'failed'): WorkflowRunRow { +function makeRunRow( + runId: string, + config: RelayYamlConfig, + status: WorkflowRunRow['status'] = 'failed' +): WorkflowRunRow { const now = new Date().toISOString(); return { id: runId, @@ -251,14 +255,18 @@ describe('resume fallback to step-output cache', () => { beforeEach(() => { vi.clearAllMocks(); waitForExitFn = vi.fn().mockResolvedValue('exited'); - mockRelayInstance.onWorkerOutput = null; + mockListeners.clear(); tmpDir = mkdtempSync(path.join(os.tmpdir(), 'resume-fallback-')); db = makeDb(); runner = new WorkflowRunner({ db, workspaceId: 'ws-test', cwd: tmpDir }); }); afterEach(() => { - try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} + try { + rmSync(tmpDir, { recursive: true, force: true }); + } catch { + /* noop */ + } }); it('should reconstruct run from step-output cache when JSONL missing', async () => { @@ -391,8 +399,14 @@ describe('file-db append diagnostics', () => { afterEach(() => { try { chmodSync(path.join(tmpDir, 'readonly'), 0o755); - } catch {} - try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} + } catch { + /* noop */ + } + try { + rmSync(tmpDir, { recursive: true, force: true }); + } catch { + /* noop */ + } }); it('should warn once when append fails', async () => { diff --git a/packages/sdk/src/__tests__/start-from.test.ts b/packages/sdk/src/__tests__/start-from.test.ts index 23633e8e0..657ff327d 100644 --- a/packages/sdk/src/__tests__/start-from.test.ts +++ b/packages/sdk/src/__tests__/start-from.test.ts @@ -61,8 +61,12 @@ let waitForExitFn: (ms?: number) => Promise<'exited' | 'timeout' | 'released'>; const mockAgent = { name: 'test-agent-abc', - get waitForExit() { return waitForExitFn; }, - get waitForIdle() { return vi.fn().mockImplementation(() => new Promise(() => {})); }, + get waitForExit() { + return waitForExitFn; + }, + get waitForIdle() { + return vi.fn().mockImplementation(() => new Promise(() => {})); + }, release: vi.fn().mockResolvedValue(undefined), }; @@ -71,6 +75,15 @@ const mockHuman = { sendMessage: vi.fn().mockResolvedValue(undefined), }; +// Listener registry for the AgentRelay mock — production AgentRelay uses +// addListener('eventName', handler). Tests fire events via emitRelayEvent. +const relayListeners = new Map void>>(); +function emitRelayEvent(event: string, payload: unknown): void { + for (const handler of relayListeners.get(event) ?? []) { + handler(payload); + } +} + const mockRelayInstance = { spawnPty: vi.fn().mockImplementation(async ({ name, task }: { name: string; task?: string }) => { const stepComplete = task?.match(/STEP_COMPLETE:([^\n]+)/)?.[1]?.trim(); @@ -81,24 +94,22 @@ const mockRelayInstance = { ? `STEP_COMPLETE:${stepComplete}\n` : 'STEP_COMPLETE:unknown\n'; - queueMicrotask(() => { - if (typeof mockRelayInstance.onWorkerOutput === 'function') { - mockRelayInstance.onWorkerOutput({ name, chunk: output }); - } - }); + queueMicrotask(() => emitRelayEvent('workerOutput', { name, chunk: output })); return { ...mockAgent, name }; }), human: vi.fn().mockReturnValue(mockHuman), shutdown: vi.fn().mockResolvedValue(undefined), onBrokerStderr: vi.fn().mockReturnValue(() => {}), - onWorkerOutput: null as ((frame: { name: string; chunk: string }) => void) | null, - onMessageReceived: null as any, - onAgentSpawned: null as any, - onAgentReleased: null as any, - onAgentExited: null as any, - onAgentIdle: null as any, - onDeliveryUpdate: null as any, + addListener: vi.fn((event: string, handler: (...args: unknown[]) => void) => { + let set = relayListeners.get(event); + if (!set) { + set = new Set(); + relayListeners.set(event, set); + } + set.add(handler); + return () => set!.delete(handler); + }), listAgentsRaw: vi.fn().mockResolvedValue([]), }; @@ -146,9 +157,7 @@ function makeLinearConfig(): RelayYamlConfig { version: '1', name: 'test-start-from', swarm: { pattern: 'dag' }, - agents: [ - { name: 'agent-a', cli: 'claude' }, - ], + agents: [{ name: 'agent-a', cli: 'claude' }], workflows: [ { name: 'default', @@ -168,9 +177,7 @@ function makeDiamondConfig(): RelayYamlConfig { version: '1', name: 'test-diamond', swarm: { pattern: 'dag' }, - agents: [ - { name: 'agent-a', cli: 'claude' }, - ], + agents: [{ name: 'agent-a', cli: 'claude' }], workflows: [ { name: 'default', @@ -196,7 +203,7 @@ describe('startFrom', () => { beforeEach(() => { vi.clearAllMocks(); waitForExitFn = vi.fn().mockResolvedValue('exited'); - mockRelayInstance.onWorkerOutput = null; + relayListeners.clear(); tmpDir = mkdtempSync(path.join(os.tmpdir(), 'start-from-')); db = makeDb(); runner = new WorkflowRunner({ db, workspaceId: 'ws-test', cwd: tmpDir }); @@ -204,9 +211,9 @@ describe('startFrom', () => { it('should throw when startFrom step does not exist', async () => { const config = makeLinearConfig(); - await expect( - runner.execute(config, 'default', undefined, { startFrom: 'nonexistent' }) - ).rejects.toThrow('startFrom step "nonexistent" not found in workflow'); + await expect(runner.execute(config, 'default', undefined, { startFrom: 'nonexistent' })).rejects.toThrow( + 'startFrom step "nonexistent" not found in workflow' + ); }); it('should skip predecessor steps in a linear chain', async () => { @@ -341,6 +348,10 @@ describe('startFrom', () => { }); afterEach(() => { - try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} + try { + rmSync(tmpDir, { recursive: true, force: true }); + } catch { + /* noop */ + } }); }); diff --git a/packages/sdk/src/__tests__/workflow-runner.test.ts b/packages/sdk/src/__tests__/workflow-runner.test.ts index f36c3541a..4444fb7d8 100644 --- a/packages/sdk/src/__tests__/workflow-runner.test.ts +++ b/packages/sdk/src/__tests__/workflow-runner.test.ts @@ -6,7 +6,15 @@ */ import { describe, it, expect, vi, beforeEach } from 'vitest'; -import { existsSync, mkdirSync, mkdtempSync, readFileSync, readdirSync, rmSync, writeFileSync } from 'node:fs'; +import { + existsSync, + mkdirSync, + mkdtempSync, + readFileSync, + readdirSync, + rmSync, + writeFileSync, +} from 'node:fs'; import os from 'node:os'; import path from 'node:path'; import type { WorkflowDb } from '../workflows/runner.js'; @@ -77,6 +85,12 @@ const mockHuman = { sendMessage: vi.fn().mockResolvedValue(undefined), }; +const mockListeners = new Map void>>(); +function emitMockEvent(event: string, ...args: any[]): void { + const set = mockListeners.get(event); + if (set) for (const cb of set) cb(...args); +} + const defaultSpawnPtyImplementation = async ({ name, task }: { name: string; task?: string }) => { const queued = mockSpawnOutputs.shift(); const stepComplete = task?.match(/STEP_COMPLETE:([^\n]+)/)?.[1]?.trim(); @@ -90,9 +104,7 @@ const defaultSpawnPtyImplementation = async ({ name, task }: { name: string; tas : 'STEP_COMPLETE:unknown\n'); queueMicrotask(() => { - if (typeof mockRelayInstance.onWorkerOutput === 'function') { - mockRelayInstance.onWorkerOutput({ name, chunk: output }); - } + emitMockEvent('workerOutput', { name, chunk: output }); }); return { ...mockAgent, name }; @@ -103,11 +115,15 @@ const mockRelayInstance = { human: vi.fn().mockReturnValue(mockHuman), shutdown: vi.fn().mockResolvedValue(undefined), onBrokerStderr: vi.fn().mockReturnValue(() => {}), - onWorkerOutput: null as ((frame: { name: string; chunk: string }) => void) | null, - onMessageReceived: null as any, - onAgentSpawned: null as any, - onAgentExited: null as any, - onAgentIdle: null as any, + addListener: vi.fn((event: string, cb: (...args: any[]) => void) => { + let set = mockListeners.get(event); + if (!set) { + set = new Set(); + mockListeners.set(event, set); + } + set.add(cb); + return () => set!.delete(cb); + }), listAgentsRaw: vi.fn().mockResolvedValue([]), }; @@ -225,7 +241,7 @@ describe('WorkflowRunner', () => { mockSpawnOutputs = []; mockAgent.release.mockResolvedValue(undefined); mockRelayInstance.spawnPty.mockImplementation(defaultSpawnPtyImplementation); - mockRelayInstance.onWorkerOutput = null; + mockListeners.clear(); db = makeDb(); runner = new WorkflowRunner({ db, workspaceId: 'ws-test' }); }); @@ -755,9 +771,7 @@ agents: const output = mockSpawnOutputs.shift() ?? 'LEAD_DONE\n'; queueMicrotask(() => { - if (typeof mockRelayInstance.onWorkerOutput === 'function') { - mockRelayInstance.onWorkerOutput({ name, chunk: output }); - } + emitMockEvent('workerOutput', { name, chunk: output }); }); return { ...mockAgent, name }; @@ -974,9 +988,7 @@ agents: const output = isOwner ? 'owner checking\n' : 'worker finished\n'; queueMicrotask(() => { - if (typeof mockRelayInstance.onWorkerOutput === 'function') { - mockRelayInstance.onWorkerOutput({ name, chunk: output }); - } + emitMockEvent('workerOutput', { name, chunk: output }); }); if (isOwner) { diff --git a/packages/sdk/src/client.ts b/packages/sdk/src/client.ts index c504f1cb9..f2459bfdb 100644 --- a/packages/sdk/src/client.ts +++ b/packages/sdk/src/client.ts @@ -41,6 +41,16 @@ import type { SendMessageInput, ListAgent, } from './types.js'; +import { EventBus } from './event-bus.js'; +import type { + AfterAgentReleaseContext, + AfterAgentSpawnContext, + AgentRelayEvents, + BeforeAgentReleaseContext, + BeforeAgentSpawnContext, + BeforeAgentSpawnHandler, + SpawnPatch, +} from './lifecycle-hooks.js'; // ── Types ────────────────────────────────────────────────────────────── @@ -51,6 +61,14 @@ export interface AgentRelayClientOptions { fetch?: typeof globalThis.fetch; /** Timeout in ms for HTTP requests. Default: 30000. */ requestTimeoutMs?: number; + /** + * Shared event bus. When constructed bare, the client owns its own bus + * — listeners registered via `addListener` flow only through this + * client. When passed in (typically by `AgentRelay`), the client uses + * the supplied bus so facade-registered listeners observe call-site + * hooks fired here. + */ + eventBus?: EventBus; } export interface AgentRelayBrokerInitArgs { @@ -83,6 +101,8 @@ export interface AgentRelaySpawnOptions { startupTimeoutMs?: number; /** Timeout in ms for HTTP requests to the broker. Default: 30000. */ requestTimeoutMs?: number; + /** Optional shared event bus — see {@link AgentRelayClientOptions.eventBus}. */ + eventBus?: EventBus; } export interface SessionInfo { @@ -122,6 +142,55 @@ function resolveSpawnTransport(input: SpawnProviderInput): AgentTransport { return input.transport ?? (input.provider === 'opencode' ? 'headless' : 'pty'); } +/** + * Serialize a {@link SpawnPtyInput} for the broker `/api/spawn` endpoint. + * Factored out of {@link AgentRelayClient.spawnPty} so the same shape can + * be applied to the post-`beforeAgentSpawn` resolved input. + */ +function buildSpawnPtyBody(input: SpawnPtyInput): Record { + return { + name: input.name, + cli: input.cli, + ...(input.model !== undefined ? { model: input.model } : {}), + args: input.args ?? [], + ...(input.task !== undefined ? { task: input.task } : {}), + channels: input.channels ?? [], + ...(input.cwd !== undefined ? { cwd: input.cwd } : {}), + ...(input.team !== undefined ? { team: input.team } : {}), + ...(input.agentToken !== undefined ? { agentToken: input.agentToken } : {}), + ...(input.shadowOf !== undefined ? { shadowOf: input.shadowOf } : {}), + ...(input.shadowMode !== undefined ? { shadowMode: input.shadowMode } : {}), + ...(input.continueFrom !== undefined ? { continueFrom: input.continueFrom } : {}), + ...(input.idleThresholdSecs !== undefined ? { idleThresholdSecs: input.idleThresholdSecs } : {}), + ...(input.restartPolicy !== undefined ? { restartPolicy: input.restartPolicy } : {}), + ...(input.skipRelayPrompt !== undefined ? { skipRelayPrompt: input.skipRelayPrompt } : {}), + }; +} + +function buildSpawnProviderBody( + input: SpawnProviderInput, + transport: AgentTransport +): Record { + return { + name: input.name, + cli: input.provider, + ...(input.model !== undefined ? { model: input.model } : {}), + args: input.args ?? [], + ...(input.task !== undefined ? { task: input.task } : {}), + channels: input.channels ?? [], + ...(input.cwd !== undefined ? { cwd: input.cwd } : {}), + ...(input.team !== undefined ? { team: input.team } : {}), + ...(input.agentToken !== undefined ? { agentToken: input.agentToken } : {}), + ...(input.shadowOf !== undefined ? { shadowOf: input.shadowOf } : {}), + ...(input.shadowMode !== undefined ? { shadowMode: input.shadowMode } : {}), + ...(input.continueFrom !== undefined ? { continueFrom: input.continueFrom } : {}), + ...(input.idleThresholdSecs !== undefined ? { idleThresholdSecs: input.idleThresholdSecs } : {}), + ...(input.restartPolicy !== undefined ? { restartPolicy: input.restartPolicy } : {}), + ...(input.skipRelayPrompt !== undefined ? { skipRelayPrompt: input.skipRelayPrompt } : {}), + transport, + }; +} + function isProcessRunning(pid: number): boolean { if (!Number.isInteger(pid) || pid <= 0) { return false; @@ -169,8 +238,14 @@ export class AgentRelayClient { private leaseTimer: ReturnType | null = null; workspaceKey?: string; + /** Resolved broker URL — captured so call-site lifecycle contexts can surface it. */ + readonly baseUrl: string; + /** Shared multi-listener registry. Created bare when no `eventBus` is passed in. */ + readonly eventBus: EventBus; constructor(options: AgentRelayClientOptions) { + this.baseUrl = options.baseUrl; + this.eventBus = options.eventBus ?? new EventBus(); this.transport = new BrokerTransport({ baseUrl: options.baseUrl, apiKey: options.apiKey, @@ -179,6 +254,48 @@ export class AgentRelayClient { }); } + /** + * Register a listener on the client's event bus. Returns an unsubscribe + * function. Equivalent to `client.eventBus.addListener(...)` but mirrors + * the `AgentRelay` facade API so direct-client callers don't need to + * reach through `.eventBus`. + */ + addListener( + event: K, + handler: (...args: AgentRelayEvents[K]) => void | Promise + ): () => void { + return this.eventBus.addListener(event, handler); + } + + /** Remove a previously-registered listener. */ + removeListener( + event: K, + handler: (...args: AgentRelayEvents[K]) => void | Promise + ): void { + this.eventBus.removeListener(event, handler); + } + + /** + * Fold `beforeAgentSpawn` patches into the input. Listeners run in + * registration order; each may return a {@link SpawnPatch} that is + * shallow-merged over the running result. Handler exceptions are caught + * and logged but do not abort the chain. + */ + private async runBeforeSpawn(ctx: BeforeAgentSpawnContext): Promise { + let resolved: SpawnPtyInput | SpawnProviderInput = { ...ctx.input }; + for (const handler of this.eventBus.listeners('beforeAgentSpawn') as Array) { + try { + const patch = await handler({ ...ctx, input: resolved as Readonly }); + if (patch && typeof patch === 'object') { + resolved = { ...resolved, ...(patch as SpawnPatch) } as SpawnPtyInput | SpawnProviderInput; + } + } catch (err) { + console.error('[agent-relay] beforeAgentSpawn listener threw:', err); + } + } + return resolved; + } + /** * Connect to an already-running broker by reading its connection file. * @@ -188,7 +305,11 @@ export class AgentRelayClient { * @param cwd — project directory (default: process.cwd()) * @param connectionPath — explicit path to connection.json (overrides cwd) */ - static connect(options?: { cwd?: string; connectionPath?: string }): AgentRelayClient { + static connect(options?: { + cwd?: string; + connectionPath?: string; + eventBus?: EventBus; + }): AgentRelayClient { const cwd = options?.cwd ?? process.cwd(); const stateDir = process.env.AGENT_RELAY_STATE_DIR; const connPath = @@ -220,7 +341,11 @@ export class AgentRelayClient { ); } - return new AgentRelayClient({ baseUrl: conn.url, apiKey: conn.api_key }); + return new AgentRelayClient({ + baseUrl: conn.url, + apiKey: conn.api_key, + ...(options?.eventBus ? { eventBus: options.eventBus } : {}), + }); } /** @@ -291,6 +416,7 @@ export class AgentRelayClient { baseUrl, apiKey, requestTimeoutMs: options?.requestTimeoutMs, + ...(options?.eventBus ? { eventBus: options.eventBus } : {}), }); client.child = child; @@ -398,26 +524,26 @@ export class AgentRelayClient { // ── Agent lifecycle ──────────────────────────────────────────────── async spawnPty(input: SpawnPtyInput): Promise<{ name: string; runtime: AgentRuntime }> { - return this.transport.request('/api/spawn', { - method: 'POST', - body: JSON.stringify({ - name: input.name, - cli: input.cli, - ...(input.model !== undefined ? { model: input.model } : {}), - args: input.args ?? [], - ...(input.task !== undefined ? { task: input.task } : {}), - channels: input.channels ?? [], - ...(input.cwd !== undefined ? { cwd: input.cwd } : {}), - ...(input.team !== undefined ? { team: input.team } : {}), - ...(input.agentToken !== undefined ? { agentToken: input.agentToken } : {}), - ...(input.shadowOf !== undefined ? { shadowOf: input.shadowOf } : {}), - ...(input.shadowMode !== undefined ? { shadowMode: input.shadowMode } : {}), - ...(input.continueFrom !== undefined ? { continueFrom: input.continueFrom } : {}), - ...(input.idleThresholdSecs !== undefined ? { idleThresholdSecs: input.idleThresholdSecs } : {}), - ...(input.restartPolicy !== undefined ? { restartPolicy: input.restartPolicy } : {}), - ...(input.skipRelayPrompt !== undefined ? { skipRelayPrompt: input.skipRelayPrompt } : {}), - }), - }); + const beforeCtx: BeforeAgentSpawnContext = { + kind: 'pty', + input, + spawnerPid: process.pid, + spawnStartTs: new Date().toISOString(), + baseUrl: this.baseUrl, + }; + const t0 = Date.now(); + const resolvedInput = (await this.runBeforeSpawn(beforeCtx)) as SpawnPtyInput; + try { + const result = await this.transport.request<{ name: string; runtime: AgentRuntime }>('/api/spawn', { + method: 'POST', + body: JSON.stringify(buildSpawnPtyBody(resolvedInput)), + }); + await this.emitAfterSpawn(beforeCtx, resolvedInput, t0, result, undefined); + return result; + } catch (err) { + await this.emitAfterSpawn(beforeCtx, resolvedInput, t0, undefined, err); + throw err; + } } async spawnProvider(input: SpawnProviderInput): Promise<{ name: string; runtime: AgentRuntime }> { @@ -428,27 +554,26 @@ export class AgentRelayClient { ); } - return this.transport.request('/api/spawn', { - method: 'POST', - body: JSON.stringify({ - name: input.name, - cli: input.provider, - ...(input.model !== undefined ? { model: input.model } : {}), - args: input.args ?? [], - ...(input.task !== undefined ? { task: input.task } : {}), - channels: input.channels ?? [], - ...(input.cwd !== undefined ? { cwd: input.cwd } : {}), - ...(input.team !== undefined ? { team: input.team } : {}), - ...(input.agentToken !== undefined ? { agentToken: input.agentToken } : {}), - ...(input.shadowOf !== undefined ? { shadowOf: input.shadowOf } : {}), - ...(input.shadowMode !== undefined ? { shadowMode: input.shadowMode } : {}), - ...(input.continueFrom !== undefined ? { continueFrom: input.continueFrom } : {}), - ...(input.idleThresholdSecs !== undefined ? { idleThresholdSecs: input.idleThresholdSecs } : {}), - ...(input.restartPolicy !== undefined ? { restartPolicy: input.restartPolicy } : {}), - ...(input.skipRelayPrompt !== undefined ? { skipRelayPrompt: input.skipRelayPrompt } : {}), - transport, - }), - }); + const beforeCtx: BeforeAgentSpawnContext = { + kind: 'provider', + input, + spawnerPid: process.pid, + spawnStartTs: new Date().toISOString(), + baseUrl: this.baseUrl, + }; + const t0 = Date.now(); + const resolvedInput = (await this.runBeforeSpawn(beforeCtx)) as SpawnProviderInput; + try { + const result = await this.transport.request<{ name: string; runtime: AgentRuntime }>('/api/spawn', { + method: 'POST', + body: JSON.stringify(buildSpawnProviderBody(resolvedInput, transport)), + }); + await this.emitAfterSpawn(beforeCtx, resolvedInput, t0, result, undefined); + return result; + } catch (err) { + await this.emitAfterSpawn(beforeCtx, resolvedInput, t0, undefined, err); + throw err; + } } async spawnHeadless(input: SpawnHeadlessInput): Promise<{ name: string; runtime: AgentRuntime }> { @@ -468,10 +593,49 @@ export class AgentRelayClient { } async release(name: string, reason?: string): Promise<{ name: string }> { - return this.transport.request(`/api/spawned/${encodeURIComponent(name)}`, { - method: 'DELETE', - ...(reason ? { body: JSON.stringify({ reason }) } : {}), - }); + const beforeCtx: BeforeAgentReleaseContext = { name, reason, baseUrl: this.baseUrl }; + const t0 = Date.now(); + await this.eventBus.emit('beforeAgentRelease', beforeCtx); + try { + const result = await this.transport.request<{ name: string }>( + `/api/spawned/${encodeURIComponent(name)}`, + { + method: 'DELETE', + ...(reason ? { body: JSON.stringify({ reason }) } : {}), + } + ); + const afterCtx: AfterAgentReleaseContext = { + ...beforeCtx, + durationMs: Date.now() - t0, + }; + await this.eventBus.emit('afterAgentRelease', afterCtx); + return result; + } catch (err) { + const afterCtx: AfterAgentReleaseContext = { + ...beforeCtx, + error: err instanceof Error ? err : new Error(String(err)), + durationMs: Date.now() - t0, + }; + await this.eventBus.emit('afterAgentRelease', afterCtx); + throw err; + } + } + + private async emitAfterSpawn( + beforeCtx: BeforeAgentSpawnContext, + resolvedInput: SpawnPtyInput | SpawnProviderInput, + startMs: number, + result: { name: string; runtime: AgentRuntime } | undefined, + error: unknown + ): Promise { + const afterCtx: AfterAgentSpawnContext = { + ...beforeCtx, + resolvedInput, + ...(result ? { result } : {}), + ...(error !== undefined ? { error: error instanceof Error ? error : new Error(String(error)) } : {}), + durationMs: Date.now() - startMs, + }; + await this.eventBus.emit('afterAgentSpawn', afterCtx); } async listAgents(): Promise { diff --git a/packages/sdk/src/event-bus.ts b/packages/sdk/src/event-bus.ts new file mode 100644 index 000000000..1867b1503 --- /dev/null +++ b/packages/sdk/src/event-bus.ts @@ -0,0 +1,91 @@ +/** + * Typed multi-listener registry shared by `AgentRelay` and `AgentRelayClient`. + * + * Replaces the previous single-callback `on*: EventHook = null` fields on + * `AgentRelay` so multiple integrations (burn stamping, Pear UI, third-party + * observers, …) can subscribe to the same event without stepping on each + * other. + * + * Each `addListener(event, handler)` returns an unsubscribe function; + * `removeListener(event, handler)` is also available. Handlers fire in + * registration order; async handlers are awaited sequentially. Handler + * exceptions are caught and logged to `console.error` so one bad listener + * never blocks the originating operation or subsequent listeners. + * + * The bus is intentionally generic — callers parameterize it with a typed + * event map (see `AgentRelayEvents` in `./lifecycle-hooks.ts`) so each + * event's payload is fully checked at the addListener / emit boundary. + */ +export type EventMap = Record; + +export type EventHandler = (...args: Args) => void | Promise; + +export class EventBus { + private handlers: Map>> = new Map(); + + /** + * Register a handler for `event`. Returns an unsubscribe function that + * removes the handler when called. + */ + addListener(event: K, handler: EventHandler): () => void { + let set = this.handlers.get(event); + if (!set) { + set = new Set(); + this.handlers.set(event, set); + } + set.add(handler as EventHandler); + return () => { + // Re-read the current Set from the map so a stale closure can't blow + // away the new Set if the caller unsubscribes, re-registers a fresh + // handler under the same event, then calls the original `off` again. + // Without this identity check the double-`off()` would silently drop + // every listener for the event. + const current = this.handlers.get(event); + if (current !== set) return; + current.delete(handler as EventHandler); + if (current.size === 0) { + this.handlers.delete(event); + } + }; + } + + /** Remove a previously-registered handler. Idempotent. */ + removeListener(event: K, handler: EventHandler): void { + const set = this.handlers.get(event); + if (!set) return; + set.delete(handler as EventHandler); + if (set.size === 0) { + this.handlers.delete(event); + } + } + + /** Number of currently-registered handlers for `event`. Useful for tests. */ + listenerCount(event: K): number { + return this.handlers.get(event)?.size ?? 0; + } + + /** Snapshot the handlers for `event` so iteration is safe under concurrent mutation. */ + listeners(event: K): Array> { + const set = this.handlers.get(event); + return set ? (Array.from(set) as Array>) : []; + } + + /** + * Fire `event` with `args`, awaiting each handler sequentially in + * registration order. Handler exceptions are caught and logged; they + * never abort the dispatch chain. + * + * Return value is intentionally `void`; consumers that need to collect + * patches (e.g. `beforeAgentSpawn`'s shallow-merge contract) iterate + * `listeners()` directly so they can capture each handler's return. + */ + async emit(event: K, ...args: E[K]): Promise { + for (const handler of this.listeners(event)) { + try { + await handler(...args); + } catch (err) { + console.error(`[agent-relay] listener for "${String(event)}" threw:`, err); + } + } + } +} diff --git a/packages/sdk/src/examples/demo.ts b/packages/sdk/src/examples/demo.ts index 6b278ec18..b433ca7f2 100644 --- a/packages/sdk/src/examples/demo.ts +++ b/packages/sdk/src/examples/demo.ts @@ -5,67 +5,67 @@ * Run: * npm run build && npm run demo */ -import { AgentRelay } from "../relay.js"; +import { AgentRelay } from '../relay.js'; const relay = new AgentRelay({ env: process.env }); // ── Event hooks ───────────────────────────────────────────────────────────── -relay.onMessageReceived = (message) => { +relay.addListener('messageReceived', (message) => { console.log(` 📨 received │ from=${message.from} to=${message.to} text="${message.text}"`); -}; +}); -relay.onMessageSent = (message) => { +relay.addListener('messageSent', (message) => { console.log(` 📤 sent │ from=${message.from} to=${message.to} text="${message.text}"`); -}; +}); -relay.onAgentSpawned = (agent) => { +relay.addListener('agentSpawned', (agent) => { console.log(` 🟢 spawned │ ${agent.name} (${agent.runtime})`); -}; +}); -relay.onAgentReleased = (agent) => { +relay.addListener('agentReleased', (agent) => { console.log(` 🔴 released │ ${agent.name}`); -}; +}); -relay.onAgentExited = (agent) => { +relay.addListener('agentExited', (agent) => { console.log(` ⚪ exited │ ${agent.name}`); -}; +}); // ── Spawn agents ──────────────────────────────────────────────────────────── -console.log("\n─── Spawning agents ───\n"); +console.log('\n─── Spawning agents ───\n'); const [agentA, agentB] = await Promise.all([ - relay.spawnPty({ name: "AgentA", cli: "claude", args: ["--print"], channels: ["general"] }), - relay.spawnPty({ name: "AgentB", cli: "claude", args: ["--print"], channels: ["general"] }), + relay.spawnPty({ name: 'AgentA', cli: 'claude', args: ['--print'], channels: ['general'] }), + relay.spawnPty({ name: 'AgentB', cli: 'claude', args: ['--print'], channels: ['general'] }), ]); // ── Send messages ─────────────────────────────────────────────────────────── -console.log("\n─── Sending messages ───\n"); +console.log('\n─── Sending messages ───\n'); -const human = relay.human({ name: "System" }); -await human.sendMessage({ to: agentA.name, text: "Hello AgentA, welcome!" }); -await human.sendMessage({ to: agentB.name, text: "Hello AgentB, welcome!" }); +const human = relay.human({ name: 'System' }); +await human.sendMessage({ to: agentA.name, text: 'Hello AgentA, welcome!' }); +await human.sendMessage({ to: agentB.name, text: 'Hello AgentB, welcome!' }); // Agent-to-agent messaging -await agentA.sendMessage({ to: agentB.name, text: "Hey B, got a task for you" }); -await agentB.sendMessage({ to: agentA.name, text: "On it!" }); +await agentA.sendMessage({ to: agentB.name, text: 'Hey B, got a task for you' }); +await agentB.sendMessage({ to: agentA.name, text: 'On it!' }); // Threaded conversation -const thread = await human.sendMessage({ to: agentA.name, text: "Status update?" }); -await agentA.sendMessage({ to: human.name, text: "All good!", threadId: thread.eventId }); +const thread = await human.sendMessage({ to: agentA.name, text: 'Status update?' }); +await agentA.sendMessage({ to: human.name, text: 'All good!', threadId: thread.eventId }); // Priority messages -await human.sendMessage({ to: agentA.name, text: "Critical alert!", priority: 0 }); -await human.sendMessage({ to: agentB.name, text: "Low priority FYI", priority: 4 }); +await human.sendMessage({ to: agentA.name, text: 'Critical alert!', priority: 0 }); +await human.sendMessage({ to: agentB.name, text: 'Low priority FYI', priority: 4 }); // Small delay to let events propagate await new Promise((r) => setTimeout(r, 500)); // ── List agents ───────────────────────────────────────────────────────────── -console.log("\n─── Active agents ───\n"); +console.log('\n─── Active agents ───\n'); const agents = await relay.listAgents(); for (const agent of agents) { @@ -74,7 +74,7 @@ for (const agent of agents) { // ── Release all ───────────────────────────────────────────────────────────── -console.log("\n─── Releasing agents ───\n"); +console.log('\n─── Releasing agents ───\n'); for (const agent of agents) { await agent.release(); @@ -85,4 +85,4 @@ await new Promise((r) => setTimeout(r, 300)); // ── Shutdown ──────────────────────────────────────────────────────────────── await relay.shutdown(); -console.log("\n─── Done ───\n"); +console.log('\n─── Done ───\n'); diff --git a/packages/sdk/src/examples/persona-spawn.ts b/packages/sdk/src/examples/persona-spawn.ts index 72cf34884..482f60436 100644 --- a/packages/sdk/src/examples/persona-spawn.ts +++ b/packages/sdk/src/examples/persona-spawn.ts @@ -12,24 +12,23 @@ * Environment: * RELAY_API_KEY — Relaycast workspace key (required) */ -import { AgentRelay } from "../relay.js"; -import { listPersonas } from "../personas.js"; +import { AgentRelay } from '../relay.js'; +import { listPersonas } from '../personas.js'; const [, , personaId, ...taskParts] = process.argv; -const task = taskParts.join(" ").trim(); +const task = taskParts.join(' ').trim(); if (!personaId) { const found = listPersonas(); - console.error("Usage: persona-spawn [task...]\n"); + console.error('Usage: persona-spawn [task...]\n'); if (found.length > 0) { - console.error("Personas discovered in the default cascade:"); + console.error('Personas discovered in the default cascade:'); for (const p of found) { console.error(` - ${p.id} (${p.path})`); } } else { console.error( - "No personas found. Place JSON files under ./agentworkforce/personas " + - "or set AGENT_WORKFORCE_HOME.", + 'No personas found. Place JSON files under ./agentworkforce/personas ' + 'or set AGENT_WORKFORCE_HOME.' ); } process.exit(1); @@ -37,13 +36,14 @@ if (!personaId) { const relay = new AgentRelay(); -relay.onAgentSpawned = (agent) => console.log(`spawned ${agent.name} (${agent.runtime})`); -relay.onAgentExited = (agent) => - console.log(`exited ${agent.name} code=${agent.exitCode ?? "none"}`); +relay.addListener('agentSpawned', (agent) => console.log(`spawned ${agent.name} (${agent.runtime})`)); +relay.addListener('agentExited', (agent) => + console.log(`exited ${agent.name} code=${agent.exitCode ?? 'none'}`) +); const agent = await relay.spawnPersona(personaId, { ...(task ? { task } : {}), - channels: ["general"], + channels: ['general'], }); console.log(`agent ${agent.name} ready, waiting for exit...`); diff --git a/packages/sdk/src/examples/quickstart.ts b/packages/sdk/src/examples/quickstart.ts index ad78d4c14..45bbc7d87 100644 --- a/packages/sdk/src/examples/quickstart.ts +++ b/packages/sdk/src/examples/quickstart.ts @@ -8,7 +8,7 @@ * RELAY_API_KEY — Relaycast workspace key (required) * AGENT_RELAY_BIN — path to agent-relay binary (optional) */ -import { AgentRelay } from "../relay.js"; +import { AgentRelay } from '../relay.js'; // The Relay is the communication backbone for your agents. // Drop it into your codebase and let your agents communicate. @@ -16,25 +16,25 @@ const relay = new AgentRelay(); // ── Event hooks ───────────────────────────────────────────────────────────── -relay.onMessageReceived = (message) => { +relay.addListener('messageReceived', (message) => { console.log(`message received → from=${message.from} to=${message.to}`); -}; +}); -relay.onMessageSent = (message) => { +relay.addListener('messageSent', (message) => { console.log(`message sent → from=${message.from} to=${message.to}`); -}; +}); -relay.onAgentSpawned = (agent) => { +relay.addListener('agentSpawned', (agent) => { console.log(`agent spawned → ${agent.name} (${agent.runtime})`); -}; +}); -relay.onAgentReleased = (agent) => { +relay.addListener('agentReleased', (agent) => { console.log(`agent released → ${agent.name}`); -}; +}); -relay.onAgentExited = (agent) => { +relay.addListener('agentExited', (agent) => { console.log(`agent exited → ${agent.name}`); -}; +}); // ── Create agents with sane defaults, running locally ─────────────────────── @@ -47,16 +47,16 @@ const [codex, claude, gemini] = await Promise.all([ // ── Configure messaging with custom CLI agents ───────────────────────────── const worker1 = await relay.spawnPty({ - name: "Worker1", - cli: "codex", - args: ["--model", "gpt-5"], - channels: ["general"], + name: 'Worker1', + cli: 'codex', + args: ['--model', 'gpt-5'], + channels: ['general'], }); // ── Control messaging from non-agent sources ──────────────────────────────── -const human = relay.human({ name: "System" }); -await human.sendMessage({ to: codex.name, text: "Hello, world!" }); +const human = relay.human({ name: 'System' }); +await human.sendMessage({ to: codex.name, text: 'Hello, world!' }); // ── List agents ───────────────────────────────────────────────────────────── diff --git a/packages/sdk/src/examples/ralph-loop.ts b/packages/sdk/src/examples/ralph-loop.ts index f3ca70c8a..f165753a0 100644 --- a/packages/sdk/src/examples/ralph-loop.ts +++ b/packages/sdk/src/examples/ralph-loop.ts @@ -25,9 +25,9 @@ * https://github.com/snarktank/ralph * https://ghuntley.com/ralph/ */ -import fs from "node:fs"; -import { execSync } from "node:child_process"; -import { AgentRelay, type Agent, type Message } from "../relay.js"; +import fs from 'node:fs'; +import { execSync } from 'node:child_process'; +import { AgentRelay, type Agent, type Message } from '../relay.js'; // ── Types ─────────────────────────────────────────────────────────────────── @@ -46,22 +46,22 @@ interface Prd { // ── Configuration ─────────────────────────────────────────────────────────── -const PRD_PATH = process.env.PRD_PATH ?? "prd.json"; -const PROGRESS_PATH = process.env.PROGRESS_PATH ?? "progress.txt"; +const PRD_PATH = process.env.PRD_PATH ?? 'prd.json'; +const PROGRESS_PATH = process.env.PROGRESS_PATH ?? 'progress.txt'; const MAX_ITERATIONS = Number(process.env.MAX_ITERATIONS ?? 10); const MAX_REVIEW_ROUNDS = Number(process.env.MAX_REVIEW_ROUNDS ?? 2); -const QUALITY_CMD = process.env.QUALITY_CMD ?? "npm run check"; +const QUALITY_CMD = process.env.QUALITY_CMD ?? 'npm run check'; /** Max time (ms) to wait for both agents per round before releasing them. */ const ROUND_TIMEOUT_MS = Number(process.env.ROUND_TIMEOUT_MS ?? 5 * 60 * 1000); // ── Helpers ───────────────────────────────────────────────────────────────── function loadPrd(): Prd { - return JSON.parse(fs.readFileSync(PRD_PATH, "utf-8")); + return JSON.parse(fs.readFileSync(PRD_PATH, 'utf-8')); } function savePrd(prd: Prd): void { - fs.writeFileSync(PRD_PATH, JSON.stringify(prd, null, 2) + "\n"); + fs.writeFileSync(PRD_PATH, JSON.stringify(prd, null, 2) + '\n'); } function appendProgress(entry: string): void { @@ -70,9 +70,7 @@ function appendProgress(entry: string): void { } function readProgress(): string { - return fs.existsSync(PROGRESS_PATH) - ? fs.readFileSync(PROGRESS_PATH, "utf-8") - : ""; + return fs.existsSync(PROGRESS_PATH) ? fs.readFileSync(PROGRESS_PATH, 'utf-8') : ''; } function nextStory(prd: Prd): Story | undefined { @@ -81,7 +79,7 @@ function nextStory(prd: Prd): Story | undefined { function runQualityChecks(): { passed: boolean; output: string } { try { - const output = execSync(QUALITY_CMD, { encoding: "utf-8", stdio: "pipe" }); + const output = execSync(QUALITY_CMD, { encoding: 'utf-8', stdio: 'pipe' }); return { passed: true, output }; } catch (err: unknown) { const output = (err as { stdout?: string }).stdout ?? String(err); @@ -92,7 +90,7 @@ function runQualityChecks(): { passed: boolean; output: string } { // ── Prompt builders ───────────────────────────────────────────────────────── function architectPrompt(story: Story, progress: string): string { - const criteria = story.acceptanceCriteria.map((c) => ` - ${c}`).join("\n"); + const criteria = story.acceptanceCriteria.map((c) => ` - ${c}`).join('\n'); return [ `## Architect: ${story.title}`, ``, @@ -111,7 +109,7 @@ function architectPrompt(story: Story, progress: string): string { criteria, ``, `### Previous Learnings`, - progress || "(first story)", + progress || '(first story)', ``, `### Your job`, `1. Post a concise implementation plan to #general (files, changes, edge cases)`, @@ -123,11 +121,11 @@ function architectPrompt(story: Story, progress: string): string { ``, `IMPORTANT: You MUST use Relaycast MCP tools to post messages. This is how`, `the orchestrator knows you're done. Post your plan first, then your verdict.`, - ].join("\n"); + ].join('\n'); } function builderPrompt(story: Story, progress: string, reviewFeedback?: string): string { - const criteria = story.acceptanceCriteria.map((c) => ` - ${c}`).join("\n"); + const criteria = story.acceptanceCriteria.map((c) => ` - ${c}`).join('\n'); const sections = [ `## Builder: ${story.title}`, ``, @@ -147,17 +145,13 @@ function builderPrompt(story: Story, progress: string, reviewFeedback?: string): ]; if (reviewFeedback) { - sections.push( - ``, - `### Review Feedback (fix these issues first)`, - reviewFeedback, - ); + sections.push(``, `### Review Feedback (fix these issues first)`, reviewFeedback); } sections.push( ``, `### Previous Learnings`, - progress || "(first story)", + progress || '(first story)', ``, `### Your job`, `1. Read the Architect's plan from #general`, @@ -166,10 +160,10 @@ function builderPrompt(story: Story, progress: string, reviewFeedback?: string): `4. When done, post exactly "IMPLEMENTATION COMPLETE" to #general`, ``, `IMPORTANT: You MUST use Relaycast MCP tools to post messages. This is how`, - `the orchestrator knows you're done.`, + `the orchestrator knows you're done.` ); - return sections.join("\n"); + return sections.join('\n'); } // ── Main loop ─────────────────────────────────────────────────────────────── @@ -178,18 +172,18 @@ const relay = new AgentRelay({ env: process.env }); const channelLog: Message[] = []; -relay.onMessageReceived = (msg) => { +relay.addListener('messageReceived', (msg) => { channelLog.push(msg); console.log(` 💬 ${msg.from}: "${msg.text.slice(0, 80)}…"`); -}; +}); -relay.onAgentSpawned = (agent) => console.log(` 🟢 ${agent.name} spawned (${agent.runtime})`); -relay.onAgentReleased = (agent) => console.log(` 🔴 ${agent.name} released`); -relay.onAgentExited = (agent) => console.log(` ⚪ ${agent.name} exited`); -relay.onMessageSent = (msg) => console.log(` 📤 → ${msg.to}: "${msg.text.slice(0, 60)}…"`); +relay.addListener('agentSpawned', (agent) => console.log(` 🟢 ${agent.name} spawned (${agent.runtime})`)); +relay.addListener('agentReleased', (agent) => console.log(` 🔴 ${agent.name} released`)); +relay.addListener('agentExited', (agent) => console.log(` ⚪ ${agent.name} exited`)); +relay.addListener('messageSent', (msg) => console.log(` 📤 → ${msg.to}: "${msg.text.slice(0, 60)}…"`)); const prd = loadPrd(); -const orchestrator = relay.human({ name: "Ralph" }); +const orchestrator = relay.human({ name: 'Ralph' }); console.log(`\n══ Ralph Loop (Claude + Codex) ══`); console.log(` branch: ${prd.branchName}`); @@ -211,7 +205,7 @@ while (iteration < MAX_ITERATIONS) { let storyPassed = false; for (let round = 0; round < MAX_REVIEW_ROUNDS; round++) { - const roundLabel = round === 0 ? "initial" : `fix-${round}`; + const roundLabel = round === 0 ? 'initial' : `fix-${round}`; const progress = readProgress(); // ── Spawn both agents concurrently ────────────────────────────────── @@ -220,12 +214,12 @@ while (iteration < MAX_ITERATIONS) { const [architect, builder] = await Promise.all([ relay.claude.spawn({ name: `Architect-${story.id}-${roundLabel}`, - channels: ["general"], + channels: ['general'], }), relay.codex.spawn({ name: `Builder-${story.id}-${roundLabel}`, - args: ["--full-auto"], - channels: ["general"], + args: ['--full-auto'], + channels: ['general'], }), ]); @@ -258,9 +252,7 @@ while (iteration < MAX_ITERATIONS) { const recent = channelLog.slice(startLen); // Check if Claude posted a review verdict - const review = recent.find( - (m) => m.text.includes("REVIEW:PASS") || m.text.includes("REVIEW:FAIL"), - ); + const review = recent.find((m) => m.text.includes('REVIEW:PASS') || m.text.includes('REVIEW:FAIL')); if (review) { verdict = review.text; console.log(` 📋 Claude posted verdict`); @@ -268,7 +260,7 @@ while (iteration < MAX_ITERATIONS) { } // Check if Codex signaled completion (Claude may still be reviewing) - const implDone = recent.find((m) => m.text.includes("IMPLEMENTATION COMPLETE")); + const implDone = recent.find((m) => m.text.includes('IMPLEMENTATION COMPLETE')); if (implDone) { console.log(` 📋 Codex finished, waiting for Claude's review…`); // Give Claude up to 60s more to post a verdict @@ -276,10 +268,11 @@ while (iteration < MAX_ITERATIONS) { while (Date.now() < reviewDeadline) { await new Promise((r) => setTimeout(r, 3000)); const afterImpl = channelLog.slice(startLen); - const rv = afterImpl.find( - (m) => m.text.includes("REVIEW:PASS") || m.text.includes("REVIEW:FAIL"), - ); - if (rv) { verdict = rv.text; break; } + const rv = afterImpl.find((m) => m.text.includes('REVIEW:PASS') || m.text.includes('REVIEW:FAIL')); + if (rv) { + verdict = rv.text; + break; + } } break; } @@ -287,14 +280,18 @@ while (iteration < MAX_ITERATIONS) { // Check if either agent exited on its own const archResult = await architect.waitForExit(0); const buildResult = await builder.waitForExit(0); - if (archResult !== "timeout" && buildResult !== "timeout") break; + if (archResult !== 'timeout' && buildResult !== 'timeout') break; await new Promise((r) => setTimeout(r, 5000)); } // Release both agents (they're interactive so won't exit on their own) const cleanup = async (agent: Agent) => { - try { await agent.release(); } catch { /* already exited */ } + try { + await agent.release(); + } catch { + /* already exited */ + } }; await Promise.all([cleanup(architect), cleanup(builder)]); @@ -310,17 +307,15 @@ while (iteration < MAX_ITERATIONS) { } // ── Check verdict ───────────────────────────────────────────────────── - if (verdict?.includes("REVIEW:PASS")) { + if (verdict?.includes('REVIEW:PASS')) { storyPassed = true; appendProgress(`✅ ${story.id} — ${story.title} — PASSED (round=${roundLabel})`); console.log(` ✅ story passed!\n`); break; - } else if (verdict?.includes("REVIEW:FAIL")) { - const failText = verdict.replace("REVIEW:FAIL", "").trim(); + } else if (verdict?.includes('REVIEW:FAIL')) { + const failText = verdict.replace('REVIEW:FAIL', '').trim(); reviewFeedback = failText; - appendProgress( - `🔄 ${story.id} round=${roundLabel} — review failed: ${reviewFeedback.slice(0, 200)}`, - ); + appendProgress(`🔄 ${story.id} round=${roundLabel} — review failed: ${reviewFeedback.slice(0, 200)}`); console.log(` 🔄 review failed, starting new round\n`); } else { // No verdict from Claude — quality passed so accept it diff --git a/packages/sdk/src/index.ts b/packages/sdk/src/index.ts index ea9bf6028..e11bf666d 100644 --- a/packages/sdk/src/index.ts +++ b/packages/sdk/src/index.ts @@ -17,6 +17,20 @@ export { type SessionInfo, type WorkerStreamSubscriptionOptions, } from './client.js'; +export { EventBus, type EventHandler, type EventMap } from './event-bus.js'; +export type { + AfterAgentReleaseContext, + AfterAgentSpawnContext, + AgentRelayEvents, + BeforeAgentReleaseContext, + BeforeAgentSpawnContext, + BeforeAgentSpawnHandler, + SpawnPatch, + AgentExitRequestedPayload, + AgentIdlePayload, + ChannelSubscriptionPayload, + WorkerOutputPayload, +} from './lifecycle-hooks.js'; export * from './models.js'; export { RelayCast, RelayError, AgentClient } from '@relaycast/sdk'; export type { RelayCastOptions, ClientOptions } from '@relaycast/sdk'; diff --git a/packages/sdk/src/lifecycle-hooks.ts b/packages/sdk/src/lifecycle-hooks.ts new file mode 100644 index 000000000..ed06dee0b --- /dev/null +++ b/packages/sdk/src/lifecycle-hooks.ts @@ -0,0 +1,163 @@ +/** + * Typed lifecycle-hook surface for `AgentRelay` / `AgentRelayClient`. + * + * Two kinds of events flow through the same registry: + * + * 1. **Broker events** — `agentSpawned`, `agentReleased`, `agentExited`, + * `agentReady`, `agentIdle`, `agentExitRequested`, + * `agentActivityChanged`, `messageReceived`, `messageSent`, + * `workerOutput`, `deliveryUpdate`, `channelSubscribed`, + * `channelUnsubscribed`. These fire when the broker emits the + * corresponding event over the WS stream. + * 2. **Call-site hooks** — `beforeAgentSpawn`, `afterAgentSpawn`, + * `beforeAgentRelease`, `afterAgentRelease`. These fire at the SDK + * call site (before / after the HTTP request), so handlers can + * observe — and, for `beforeAgentSpawn`, *modify* — the spawn input + * before it reaches the broker. + * + * The `beforeAgentSpawn` contract is the only one that supports + * mutation: handlers may return a {@link SpawnPatch} to merge into the + * input via shallow merge in registration order. All other hooks are + * observe-only — return type `void | Promise`. + * + * Types from `relay.ts` are pulled in with `import type` to avoid a + * runtime circular import; TypeScript erases type-only imports so the + * runtime dependency graph stays unidirectional + * (`relay.ts` → `event-bus.ts` → no further). + */ + +import type { AgentRuntime, BrokerEvent } from './protocol.js'; +import type { Agent, AgentActivityChange, Message } from './relay.js'; +import type { SpawnPtyInput, SpawnProviderInput } from './types.js'; + +// ── SpawnPatch ───────────────────────────────────────────────────────────── + +/** + * The subset of {@link SpawnPtyInput} / {@link SpawnProviderInput} fields a + * `beforeAgentSpawn` handler may patch. Keeping this narrower than the full + * input type stops handlers from rewriting identity (`name`, `cli`, + * `provider`, `cwd`) — those need to come from the caller. + * + * For array fields (`args`, `channels`) a patch *replaces* the array. To + * extend rather than replace, spread the current value: + * + * ```ts + * relay.addListener('beforeAgentSpawn', (ctx) => ({ + * args: [...(ctx.input.args ?? []), '--session-id', uuid], + * })); + * ``` + * + * When multiple handlers return patches, they merge in registration order + * via shallow `Object.assign` — later handlers override earlier ones for + * the same key. + */ +export type SpawnPatch = Partial< + Pick +>; + +// ── Call-site contexts ───────────────────────────────────────────────────── + +export interface BeforeAgentSpawnContext { + /** Which spawn API was called. */ + kind: 'pty' | 'provider'; + /** Raw input the caller passed in. Treat as read-only — return a {@link SpawnPatch} to modify. */ + input: Readonly; + /** `process.pid` of the calling Node process. Useful for burn-style stamping. */ + spawnerPid: number; + /** ISO timestamp captured the instant the hook chain started. */ + spawnStartTs: string; + /** Resolved broker base URL the spawn will POST to. */ + baseUrl: string; +} + +export type BeforeAgentSpawnHandler = ( + ctx: BeforeAgentSpawnContext +) => void | SpawnPatch | Promise; + +export interface AfterAgentSpawnContext extends BeforeAgentSpawnContext { + /** Final input that was sent to the broker — original input merged with every handler's patch. */ + resolvedInput: SpawnPtyInput | SpawnProviderInput; + /** Broker reply on success. */ + result?: { name: string; runtime: AgentRuntime }; + /** Set when the broker call rejected. Mutually exclusive with `result`. */ + error?: Error; + /** Wall-clock duration from `beforeAgentSpawn` start to here. */ + durationMs: number; +} + +export interface BeforeAgentReleaseContext { + name: string; + reason?: string; + baseUrl: string; +} + +export interface AfterAgentReleaseContext extends BeforeAgentReleaseContext { + error?: Error; + durationMs: number; +} + +// ── Broker-event payload shapes ──────────────────────────────────────────── + +export interface AgentIdlePayload { + name: string; + idleSecs: number; +} + +export interface AgentExitRequestedPayload { + name: string; + reason: string; +} + +export interface WorkerOutputPayload { + name: string; + stream: string; + chunk: string; +} + +/** + * Object-shaped payload for channel subscribe / unsubscribe events. The + * pre-2.x single-callback fields took two positional args + * (`(agent, channels) => void`); the registry standardizes on an object + * payload so all events share the one-arg shape and future fields can be + * added without breaking handlers. + */ +export interface ChannelSubscriptionPayload { + agent: string; + channels: string[]; +} + +// ── Event map ────────────────────────────────────────────────────────────── + +/** + * Typed event map consumed by the {@link EventBus} that backs + * `AgentRelay.addListener` / `removeListener`. + * + * Each entry's tuple is the handler argument list (always length 1 here — + * payloads are objects rather than positional args). + * + * Declared as a `type` alias rather than an `interface` so it satisfies + * the `EventBus` constraint without requiring callers + * to spell out an index signature. + */ +export type AgentRelayEvents = { + // Broker events (multi-listener replacements for the old `on*` fields) + messageReceived: [Message]; + messageSent: [Message]; + agentSpawned: [Agent]; + agentReleased: [Agent]; + agentExited: [Agent]; + agentReady: [Agent]; + workerOutput: [WorkerOutputPayload]; + deliveryUpdate: [BrokerEvent]; + agentExitRequested: [AgentExitRequestedPayload]; + agentIdle: [AgentIdlePayload]; + agentActivityChanged: [AgentActivityChange]; + channelSubscribed: [ChannelSubscriptionPayload]; + channelUnsubscribed: [ChannelSubscriptionPayload]; + + // Call-site hooks (new) + beforeAgentSpawn: [BeforeAgentSpawnContext]; + afterAgentSpawn: [AfterAgentSpawnContext]; + beforeAgentRelease: [BeforeAgentReleaseContext]; + afterAgentRelease: [AfterAgentReleaseContext]; +}; diff --git a/packages/sdk/src/relay.ts b/packages/sdk/src/relay.ts index 93004dfbc..b32b64da2 100644 --- a/packages/sdk/src/relay.ts +++ b/packages/sdk/src/relay.ts @@ -10,8 +10,8 @@ * * const relay = new AgentRelay(); * - * relay.onMessageReceived = (message) => console.log(message); - * relay.onAgentSpawned = (agent) => console.log("spawned", agent.name); + * relay.addListener('messageReceived', (message) => console.log(message)); + * relay.addListener('agentSpawned', (agent) => console.log("spawned", agent.name)); * * const codex = await relay.codex.spawn(); * const human = relay.human({ name: "System" }); @@ -30,6 +30,8 @@ import path from 'node:path'; import { RelayCast } from '@relaycast/sdk'; import { AgentRelayClient, type AgentRelayBrokerInitArgs, type AgentRelaySpawnOptions } from './client.js'; +import { EventBus } from './event-bus.js'; +import type { AgentRelayEvents } from './lifecycle-hooks.js'; import { buildPersonaSpawnSpec, composePersonaTask, @@ -302,11 +304,11 @@ export interface Agent { readonly channels: string[]; /** Current lifecycle status of the agent. */ readonly status: AgentStatus; - /** Set when the agent exits. Available after `onAgentExited` fires. */ + /** Set when the agent exits. Available once the `agentExited` event fires. */ exitCode?: number; - /** Set when the agent exits via signal. Available after `onAgentExited` fires. */ + /** Set when the agent exits via signal. Available once the `agentExited` event fires. */ exitSignal?: string; - /** Set when the agent requests exit via /exit. Available after `onAgentExitRequested` fires. */ + /** Set when the agent requests exit via /exit. Available once the `agentExitRequested` event fires. */ exitReason?: string; release(reasonOrOptions?: string | ReleaseOptions): Promise; waitForReady(timeoutMs?: number): Promise; @@ -436,20 +438,49 @@ interface AgentActivityState { // ── AgentRelay facade ─────────────────────────────────────────────────────── export class AgentRelay { - // Event hooks — assign a callback or null to clear. - onMessageReceived: EventHook = null; - onMessageSent: EventHook = null; - onAgentSpawned: EventHook = null; - onAgentReleased: EventHook = null; - onAgentExited: EventHook = null; - onAgentReady: EventHook = null; - onWorkerOutput: EventHook<{ name: string; stream: string; chunk: string }> = null; - onDeliveryUpdate: EventHook = null; - onAgentExitRequested: EventHook<{ name: string; reason: string }> = null; - onAgentIdle: EventHook<{ name: string; idleSecs: number }> = null; - onAgentActivityChanged: EventHook = null; - onChannelSubscribed: ((agent: string, channels: string[]) => void) | null = null; - onChannelUnsubscribed: ((agent: string, channels: string[]) => void) | null = null; + /** + * Multi-listener event registry. Subscribe via {@link addListener} or + * `bus.addListener` directly; emit happens internally as broker events + * arrive and at SDK call sites for the spawn / release lifecycle hooks. + * + * The bus is shared with the underlying `AgentRelayClient` (created via + * {@link ensureStarted}) so listeners registered on either object see + * the same events. + */ + readonly bus: EventBus = new EventBus(); + + // ── Listener registration ─────────────────────────────────────────────── + + /** + * Register a listener for a relay lifecycle event. Returns an + * unsubscribe function. + * + * Example: + * ```ts + * const off = relay.addListener('agentSpawned', (agent) => console.log(agent.name)); + * // later: + * off(); + * ``` + * + * Replaces the pre-2.x single-callback `on*` fields. Multiple listeners + * can register for the same event; they fire sequentially in + * registration order. Async handlers are awaited. Handler exceptions + * are caught and logged; one bad listener never blocks the others. + */ + addListener( + event: K, + handler: (...args: AgentRelayEvents[K]) => void | Promise + ): () => void { + return this.bus.addListener(event, handler); + } + + /** Remove a previously-registered listener. Idempotent. */ + removeListener( + event: K, + handler: (...args: AgentRelayEvents[K]) => void | Promise + ): void { + this.bus.removeListener(event, handler); + } // ── Public accessors ──────────────────────────────────────────────────── @@ -868,7 +899,7 @@ export class AgentRelay { data: input.data, mode: input.mode, }; - this.onMessageSent?.(msg); + void this.bus.emit('messageSent', msg); return msg; }, }; @@ -1331,7 +1362,7 @@ export class AgentRelay { return; } state.active = active; - this.onAgentActivityChanged?.({ + void this.bus.emit('agentActivityChanged', { name, active, pendingDeliveries: state.pendingDeliveries.size, @@ -1461,6 +1492,7 @@ export class AgentRelay { .then(() => AgentRelayClient.spawn({ ...this.clientOptions, + eventBus: this.bus, onStderr: (line) => { for (const listener of this.stderrListeners) { try { @@ -1520,7 +1552,7 @@ export class AgentRelay { threadId: event.thread_id, mode: event.injection_mode ?? event.mode, }; - this.onMessageReceived?.(msg); + void this.bus.emit('messageReceived', msg); break; } case 'agent_spawned': { @@ -1529,7 +1561,7 @@ export class AgentRelay { this.messageReadyAgents.delete(event.name); this.exitedAgents.delete(event.name); this.idleAgents.delete(event.name); - this.onAgentSpawned?.(agent); + void this.bus.emit('agentSpawned', agent); break; } case 'agent_released': { @@ -1539,7 +1571,7 @@ export class AgentRelay { this.readyAgents.delete(event.name); this.messageReadyAgents.delete(event.name); this.idleAgents.delete(event.name); - this.onAgentReleased?.(agent); + void this.bus.emit('agentReleased', agent); this.knownAgents.delete(event.name); this.outputListeners.delete(event.name); this.exitResolvers.get(event.name)?.resolve('released'); @@ -1561,7 +1593,7 @@ export class AgentRelay { if (event.reason !== undefined) { (agent as { exitReason?: string }).exitReason = event.reason; } - this.onAgentExited?.(agent); + void this.bus.emit('agentExited', agent); this.knownAgents.delete(event.name); this.outputListeners.delete(event.name); this.exitResolvers.get(event.name)?.resolve('exited'); @@ -1573,7 +1605,7 @@ export class AgentRelay { case 'agent_exit': { const agent = this.knownAgents.get(event.name) ?? this.ensureAgentHandle(event.name, 'pty', []); (agent as { exitReason?: string }).exitReason = event.reason; - this.onAgentExitRequested?.({ name: event.name, reason: event.reason }); + void this.bus.emit('agentExitRequested', { name: event.name, reason: event.reason }); break; } case 'worker_ready': { @@ -1581,17 +1613,17 @@ export class AgentRelay { this.readyAgents.add(event.name); this.exitedAgents.delete(event.name); this.idleAgents.delete(event.name); - this.onAgentReady?.(agent); + void this.bus.emit('agentReady', agent); break; } case 'channel_subscribed': { this.addAgentChannels(event.name, event.channels); - this.onChannelSubscribed?.(event.name, event.channels); + void this.bus.emit('channelSubscribed', { agent: event.name, channels: event.channels }); break; } case 'channel_unsubscribed': { this.removeAgentChannels(event.name, event.channels); - this.onChannelUnsubscribed?.(event.name, event.channels); + void this.bus.emit('channelUnsubscribed', { agent: event.name, channels: event.channels }); break; } case 'delivery_queued': { @@ -1656,7 +1688,7 @@ export class AgentRelay { case 'worker_stream': { // Agent producing output is no longer idle this.idleAgents.delete(event.name); - this.onWorkerOutput?.({ + void this.bus.emit('workerOutput', { name: event.name, stream: event.stream, chunk: event.chunk, @@ -1668,7 +1700,7 @@ export class AgentRelay { case 'agent_idle': { this.clearAgentDeliveries(event.name, 'agent_idle'); this.idleAgents.add(event.name); - this.onAgentIdle?.({ + void this.bus.emit('agentIdle', { name: event.name, idleSecs: event.idle_secs, }); @@ -1679,7 +1711,7 @@ export class AgentRelay { } } if (event.kind.startsWith('delivery_') || event.kind.startsWith('message_delivery_')) { - this.onDeliveryUpdate?.(event); + void this.bus.emit('deliveryUpdate', event); } }); } @@ -1858,7 +1890,7 @@ export class AgentRelay { data: input.data, mode: input.mode, }; - relay.onMessageSent?.(msg); + void relay.bus.emit('messageSent', msg); return msg; }, async subscribe(channelsToAdd: string[]) { diff --git a/packages/sdk/src/shadow.ts b/packages/sdk/src/shadow.ts index f4dbe1752..762aa721c 100644 --- a/packages/sdk/src/shadow.ts +++ b/packages/sdk/src/shadow.ts @@ -16,7 +16,7 @@ * shadows.bind("Reviewer", "Worker1"); * * // Wire into relay events - * relay.onMessageReceived = (msg) => { + * relay.addListener('messageReceived', (msg) => { * const copies = shadows.intercept(msg.from, msg.to, msg); * for (const copy of copies) { * // Forward shadow copies to the shadow agent @@ -25,7 +25,7 @@ * text: `[Shadow of ${copy.primaryAgent}] ${msg.text}`, * }); * } - * }; + * }); * ``` */ @@ -50,7 +50,7 @@ export interface ShadowRelationship extends ShadowConfig { export interface ShadowCopy { shadowAgent: string; primaryAgent: string; - direction: "incoming" | "outgoing"; + direction: 'incoming' | 'outgoing'; } // ── Manager ────────────────────────────────────────────────────────────────── @@ -71,7 +71,7 @@ export class ShadowManager { speakOn?: SpeakOnTrigger[]; receiveIncoming?: boolean; receiveOutgoing?: boolean; - } = {}, + } = {} ): void { // Clean up any existing binding for this shadow this.unbind(shadowAgent); @@ -79,7 +79,7 @@ export class ShadowManager { const relationship: ShadowRelationship = { shadowAgent, primaryAgent, - speakOn: options.speakOn ?? ["EXPLICIT_ASK"], + speakOn: options.speakOn ?? ['EXPLICIT_ASK'], receiveIncoming: options.receiveIncoming ?? true, receiveOutgoing: options.receiveOutgoing ?? true, }; @@ -139,16 +139,14 @@ export class ShadowManager { const rel = shadows.find((s) => s.shadowAgent === shadowAgent); if (!rel) return true; - return ( - rel.speakOn.includes(trigger) || rel.speakOn.includes("ALL_MESSAGES") - ); + return rel.speakOn.includes(trigger) || rel.speakOn.includes('ALL_MESSAGES'); } /** * Determine which shadows should receive a copy of a message * between `from` and `to`. Returns a list of shadow copies to deliver. * - * Call this from your `onMessageReceived` / `onMessageSent` hooks + * Call this from your `messageReceived` / `messageSent` listeners * to fan out shadow copies. */ intercept(from: string, to: string): ShadowCopy[] { @@ -162,14 +160,14 @@ export class ShadowManager { copies.push({ shadowAgent: s.shadowAgent, primaryAgent: from, - direction: "outgoing", + direction: 'outgoing', }); } } } // Incoming shadows of the recipient - if (to && to !== "*") { + if (to && to !== '*') { const recipientShadows = this.shadowsByPrimary.get(to); if (recipientShadows) { for (const s of recipientShadows) { @@ -177,7 +175,7 @@ export class ShadowManager { copies.push({ shadowAgent: s.shadowAgent, primaryAgent: to, - direction: "incoming", + direction: 'incoming', }); } } @@ -194,16 +192,10 @@ export class ShadowManager { * (e.g. CODE_WRITTEN, REVIEW_REQUEST), call this to find which * shadows should receive a notification. */ - emitTrigger( - primaryAgent: string, - trigger: SpeakOnTrigger, - ): string[] { + emitTrigger(primaryAgent: string, trigger: SpeakOnTrigger): string[] { const shadows = this.shadowsByPrimary.get(primaryAgent) ?? []; return shadows - .filter( - (s) => - s.speakOn.includes(trigger) || s.speakOn.includes("ALL_MESSAGES"), - ) + .filter((s) => s.speakOn.includes(trigger) || s.speakOn.includes('ALL_MESSAGES')) .map((s) => s.shadowAgent); } diff --git a/packages/sdk/src/spawn-from-env.ts b/packages/sdk/src/spawn-from-env.ts index 954a4cf66..e520f05a3 100644 --- a/packages/sdk/src/spawn-from-env.ts +++ b/packages/sdk/src/spawn-from-env.ts @@ -12,8 +12,8 @@ * ``` */ -import { AgentRelay } from "./relay.js"; -import { getCliDefinition } from "./cli-registry.js"; +import { AgentRelay } from './relay.js'; +import { getCliDefinition } from './cli-registry.js'; // ── Types ────────────────────────────────────────────────────────────────── @@ -83,22 +83,18 @@ function getBypassFlagConfig(cli: string): BypassFlagConfig | undefined { * Parse and validate spawn environment variables. * Throws with a clear message on missing required keys. */ -export function parseSpawnEnv( - env: Record = process.env, -): SpawnEnvInput { +export function parseSpawnEnv(env: Record = process.env): SpawnEnvInput { const AGENT_NAME = env.AGENT_NAME; const AGENT_CLI = env.AGENT_CLI; const RELAY_API_KEY = env.RELAY_API_KEY; const missing: string[] = []; - if (!AGENT_NAME) missing.push("AGENT_NAME"); - if (!AGENT_CLI) missing.push("AGENT_CLI"); - if (!RELAY_API_KEY) missing.push("RELAY_API_KEY"); + if (!AGENT_NAME) missing.push('AGENT_NAME'); + if (!AGENT_CLI) missing.push('AGENT_CLI'); + if (!RELAY_API_KEY) missing.push('RELAY_API_KEY'); if (missing.length > 0) { - throw new Error( - `[spawn-from-env] Missing required environment variables: ${missing.join(", ")}`, - ); + throw new Error(`[spawn-from-env] Missing required environment variables: ${missing.join(', ')}`); } return { @@ -124,7 +120,7 @@ export function parseSpawnEnv( function parseArgs(raw: string | undefined): string[] { if (!raw) return []; const trimmed = raw.trim(); - if (trimmed.startsWith("[")) { + if (trimmed.startsWith('[')) { try { const parsed = JSON.parse(trimmed); if (Array.isArray(parsed)) return parsed.map(String); @@ -142,19 +138,17 @@ function parseArgs(raw: string | undefined): string[] { export function resolveSpawnPolicy(input: SpawnEnvInput): SpawnPolicyResult { const extraArgs = parseArgs(input.AGENT_ARGS); const channels = input.AGENT_CHANNELS - ? input.AGENT_CHANNELS.split(",") + ? input.AGENT_CHANNELS.split(',') .map((c) => c.trim()) .filter(Boolean) - : ["general"]; + : ['general']; - const disableBypass = input.AGENT_DISABLE_DEFAULT_BYPASS === "1"; + const disableBypass = input.AGENT_DISABLE_DEFAULT_BYPASS === '1'; const bypassConfig = getBypassFlagConfig(input.AGENT_CLI); let bypassApplied = false; const args = [...extraArgs]; - const bypassValues = bypassConfig - ? [bypassConfig.flag, ...(bypassConfig.aliases ?? [])] - : []; + const bypassValues = bypassConfig ? [bypassConfig.flag, ...(bypassConfig.aliases ?? [])] : []; const hasBypassAlready = bypassValues.some((value) => args.includes(value)); if (bypassConfig && !disableBypass && !hasBypassAlready) { @@ -190,21 +184,18 @@ export function resolveSpawnPolicy(input: SpawnEnvInput): SpawnPolicyResult { * Creates a broker, spawns the agent via PTY, and waits for exit. * Returns the exit reason and exit code. */ -export async function spawnFromEnv( - options: SpawnFromEnvOptions = {}, -): Promise { - const env = - options.env ?? (process.env as Record); +export async function spawnFromEnv(options: SpawnFromEnvOptions = {}): Promise { + const env = options.env ?? (process.env as Record); const parsed = parseSpawnEnv(env); const policy = resolveSpawnPolicy(parsed); console.log( `[spawn-from-env] Spawning agent: name=${policy.name} cli=${policy.cli} ` + - `channels=${policy.channels.join(",")} bypass=${policy.bypassApplied}`, + `channels=${policy.channels.join(',')} bypass=${policy.bypassApplied}` ); if (policy.task) { console.log( - `[spawn-from-env] Task: ${policy.task.slice(0, 200)}${policy.task.length > 200 ? "..." : ""}`, + `[spawn-from-env] Task: ${policy.task.slice(0, 200)}${policy.task.length > 200 ? '...' : ''}` ); } @@ -216,18 +207,18 @@ export async function spawnFromEnv( env: env as NodeJS.ProcessEnv, }); - relay.onAgentSpawned = (agent) => { + relay.addListener('agentSpawned', (agent) => { console.log(`[spawn-from-env] Agent spawned: ${agent.name}`); - }; - relay.onAgentReady = (agent) => { + }); + relay.addListener('agentReady', (agent) => { console.log(`[spawn-from-env] Agent ready: ${agent.name}`); - }; - relay.onAgentExited = (agent) => { + }); + relay.addListener('agentExited', (agent) => { console.log( `[spawn-from-env] Agent exited: ${agent.name} ` + - `code=${agent.exitCode ?? "none"} signal=${agent.exitSignal ?? "none"}`, + `code=${agent.exitCode ?? 'none'} signal=${agent.exitSignal ?? 'none'}` ); - }; + }); try { const agent = await relay.spawnPty({ diff --git a/packages/sdk/src/workflows/__tests__/budget-enforcement.test.ts b/packages/sdk/src/workflows/__tests__/budget-enforcement.test.ts index 4a54b5ca8..7b7fd9a7b 100644 --- a/packages/sdk/src/workflows/__tests__/budget-enforcement.test.ts +++ b/packages/sdk/src/workflows/__tests__/budget-enforcement.test.ts @@ -104,13 +104,7 @@ const mockRelayInstance = { onBrokerStderr: vi.fn().mockReturnValue(() => {}), listAgentsRaw: vi.fn().mockResolvedValue([]), listAgents: vi.fn().mockResolvedValue([]), - onWorkerOutput: null as ((frame: { name: string; chunk: string }) => void) | null, - onMessageReceived: null as any, - onAgentSpawned: null as any, - onAgentReleased: null as any, - onAgentExited: null as any, - onAgentIdle: null as any, - onDeliveryUpdate: null as any, + addListener: vi.fn(() => () => {}), }; vi.mock('@relaycast/sdk', () => ({ diff --git a/packages/sdk/src/workflows/__tests__/e2e-permissions.test.ts b/packages/sdk/src/workflows/__tests__/e2e-permissions.test.ts index ba27a71ce..257a802fe 100644 --- a/packages/sdk/src/workflows/__tests__/e2e-permissions.test.ts +++ b/packages/sdk/src/workflows/__tests__/e2e-permissions.test.ts @@ -193,15 +193,19 @@ const mockHuman = { sendMessage: vi.fn().mockResolvedValue(undefined), }; +const mockListeners = new Map void>>(); +function emitMockEvent(event: string, ...args: any[]): void { + const set = mockListeners.get(event); + if (set) for (const cb of set) cb(...args); +} + const defaultSpawnPtyImplementation = async ({ name, task }: { name: string; task?: string }) => { const queued = mockSpawnOutputs.shift(); const stepComplete = task?.match(/STEP_COMPLETE:([^\n]+)/)?.[1]?.trim(); const output = queued ?? (stepComplete ? `STEP_COMPLETE:${stepComplete}\n` : 'STEP_COMPLETE:unknown\n'); queueMicrotask(() => { - if (typeof mockRelayInstance.onWorkerOutput === 'function') { - mockRelayInstance.onWorkerOutput({ name, chunk: output }); - } + emitMockEvent('workerOutput', { name, chunk: output }); }); return { ...mockAgent, name }; @@ -212,13 +216,15 @@ const mockRelayInstance = { human: vi.fn().mockReturnValue(mockHuman), shutdown: vi.fn().mockResolvedValue(undefined), onBrokerStderr: vi.fn().mockReturnValue(() => {}), - onWorkerOutput: null as ((frame: { name: string; chunk: string }) => void) | null, - onMessageReceived: null as any, - onAgentSpawned: null as any, - onAgentReleased: null as any, - onAgentExited: null as any, - onAgentIdle: null as any, - onDeliveryUpdate: null as any, + addListener: vi.fn((event: string, cb: (...args: any[]) => void) => { + let set = mockListeners.get(event); + if (!set) { + set = new Set(); + mockListeners.set(event, set); + } + set.add(cb); + return () => set!.delete(cb); + }), listAgentsRaw: vi.fn().mockResolvedValue([]), }; @@ -307,7 +313,7 @@ describe('WorkflowRunner permissions integration', () => { mockSpawnOutputs = []; mockAgent.release.mockResolvedValue(undefined); mockRelayInstance.spawnPty.mockImplementation(defaultSpawnPtyImplementation); - mockRelayInstance.onWorkerOutput = null; + mockListeners.clear(); lastProvisionCall = null; lastProvisionResult = null; workspaceDir = createWorkspace(); diff --git a/packages/sdk/src/workflows/__tests__/permissions-integration.test.ts b/packages/sdk/src/workflows/__tests__/permissions-integration.test.ts index 9f6483889..3fe7690a2 100644 --- a/packages/sdk/src/workflows/__tests__/permissions-integration.test.ts +++ b/packages/sdk/src/workflows/__tests__/permissions-integration.test.ts @@ -120,19 +120,27 @@ const mockHuman = { sendMessage: vi.fn().mockResolvedValue(undefined), }; +const mockListeners = new Map void>>(); +function emitMockEvent(event: string, ...args: any[]): void { + const set = mockListeners.get(event); + if (set) for (const cb of set) cb(...args); +} + const mockRelayInstance = { spawnPty: vi.fn(), human: vi.fn().mockReturnValue(mockHuman), shutdown: vi.fn().mockResolvedValue(undefined), onBrokerStderr: vi.fn().mockReturnValue(() => {}), listAgentsRaw: vi.fn().mockResolvedValue([]), - onWorkerOutput: null as ((frame: { name: string; chunk: string }) => void) | null, - onMessageReceived: null as any, - onAgentSpawned: null as any, - onAgentReleased: null as any, - onAgentExited: null as any, - onAgentIdle: null as any, - onDeliveryUpdate: null as any, + addListener: vi.fn((event: string, cb: (...args: any[]) => void) => { + let set = mockListeners.get(event); + if (!set) { + set = new Set(); + mockListeners.set(event, set); + } + set.add(cb); + return () => set!.delete(cb); + }), }; const defaultSpawnPtyImplementation = async ({ name, task }: { name: string; task?: string }) => { @@ -141,7 +149,7 @@ const defaultSpawnPtyImplementation = async ({ name, task }: { name: string; tas const output = queued ?? (stepComplete ? `STEP_COMPLETE:${stepComplete}\n` : 'STEP_COMPLETE:done\n'); queueMicrotask(() => { - mockRelayInstance.onWorkerOutput?.({ name, chunk: output }); + emitMockEvent('workerOutput', { name, chunk: output }); }); return { ...mockAgent, name }; @@ -308,7 +316,7 @@ beforeEach(() => { waitForIdleFn = vi.fn().mockImplementation(() => never()); mockAgent.release.mockResolvedValue(undefined); mockRelayInstance.spawnPty.mockImplementation(defaultSpawnPtyImplementation); - mockRelayInstance.onWorkerOutput = null; + mockListeners.clear(); }); afterEach(() => { diff --git a/packages/sdk/src/workflows/__tests__/verification-custom.test.ts b/packages/sdk/src/workflows/__tests__/verification-custom.test.ts index 8c09f701d..0c737cc90 100644 --- a/packages/sdk/src/workflows/__tests__/verification-custom.test.ts +++ b/packages/sdk/src/workflows/__tests__/verification-custom.test.ts @@ -40,13 +40,7 @@ const mockRelayInstance = { shutdown: vi.fn().mockResolvedValue(undefined), onBrokerStderr: vi.fn().mockReturnValue(() => {}), listAgentsRaw: vi.fn().mockResolvedValue([]), - onWorkerOutput: null as ((frame: { name: string; chunk: string }) => void) | null, - onMessageReceived: null as any, - onAgentSpawned: null as any, - onAgentReleased: null as any, - onAgentExited: null as any, - onAgentIdle: null as any, - onDeliveryUpdate: null as any, + addListener: vi.fn(() => () => {}), }; vi.mock('../../relay.js', () => ({ diff --git a/packages/sdk/src/workflows/__tests__/verification-traceback.test.ts b/packages/sdk/src/workflows/__tests__/verification-traceback.test.ts index 6597c1974..4661ad932 100644 --- a/packages/sdk/src/workflows/__tests__/verification-traceback.test.ts +++ b/packages/sdk/src/workflows/__tests__/verification-traceback.test.ts @@ -129,13 +129,7 @@ const mockRelayInstance = { onBrokerStderr: vi.fn().mockReturnValue(() => {}), listAgentsRaw: vi.fn().mockResolvedValue([]), listAgents: vi.fn().mockResolvedValue([]), - onWorkerOutput: null as ((frame: { name: string; chunk: string }) => void) | null, - onMessageReceived: null as any, - onAgentSpawned: null as any, - onAgentReleased: null as any, - onAgentExited: null as any, - onAgentIdle: null as any, - onDeliveryUpdate: null as any, + addListener: vi.fn(() => () => {}), }; vi.mock('@relaycast/sdk', () => ({ diff --git a/packages/sdk/src/workflows/runner.ts b/packages/sdk/src/workflows/runner.ts index 634cc2761..1acdeb77e 100644 --- a/packages/sdk/src/workflows/runner.ts +++ b/packages/sdk/src/workflows/runner.ts @@ -532,6 +532,7 @@ export class WorkflowRunner { private runStartTime?: number; /** Unsubscribe handle for broker stderr listener wired during a run. */ private unsubBrokerStderr?: () => void; + private unsubRelayListeners: Array<() => void> = []; /** Tracks last idle log time per agent to debounce idle warnings (30s multiples). */ private readonly lastIdleLog = new Map(); /** Tracks last logged activity type per agent to avoid duplicate status lines. */ @@ -3152,165 +3153,179 @@ export class WorkflowRunner { }); // Wire PTY output dispatcher — routes chunks to per-agent listeners + activity logging - this.relay.onWorkerOutput = ({ name, chunk }) => { - const listener = this.ptyListeners.get(name); - if (listener) listener(chunk); - - // Parse PTY output for high-signal activity - const stripped = WorkflowRunner.stripAnsi(chunk); - const shortName = name.replace(/-[a-f0-9]{6,}$/, ''); - let activity: string | undefined; - if (/Read\(/.test(stripped)) { - // Extract filename — path may be truncated at chunk boundary so require - // at least a dir separator or 8+ chars to trust the basename. - const m = stripped.match(/Read\(\s*~?([^\s)"']{8,})/); - if (m) { - const base = path.basename(m[1]); - activity = base.length >= 3 ? `Reading ${base}` : 'Reading file...'; - } else { - activity = 'Reading file...'; + this.unsubRelayListeners.push( + this.relay.addListener('workerOutput', ({ name, chunk }) => { + const listener = this.ptyListeners.get(name); + if (listener) listener(chunk); + + // Parse PTY output for high-signal activity + const stripped = WorkflowRunner.stripAnsi(chunk); + const shortName = name.replace(/-[a-f0-9]{6,}$/, ''); + let activity: string | undefined; + if (/Read\(/.test(stripped)) { + // Extract filename — path may be truncated at chunk boundary so require + // at least a dir separator or 8+ chars to trust the basename. + const m = stripped.match(/Read\(\s*~?([^\s)"']{8,})/); + if (m) { + const base = path.basename(m[1]); + activity = base.length >= 3 ? `Reading ${base}` : 'Reading file...'; + } else { + activity = 'Reading file...'; + } + } else if (/Edit\(/.test(stripped)) { + const m = stripped.match(/Edit\(\s*~?([^\s)"']{8,})/); + if (m) { + const base = path.basename(m[1]); + activity = base.length >= 3 ? `Editing ${base}` : 'Editing file...'; + } else { + activity = 'Editing file...'; + } + } else if (/Bash\(/.test(stripped)) { + // Extract a short preview of the command + const m = stripped.match(/Bash\(\s*(.{1,40})/); + activity = m ? `Running: ${m[1].trim()}...` : 'Running command...'; + } else if (/Explore\(/.test(stripped)) { + const m = stripped.match(/Explore\(\s*(.{1,50})/); + activity = m ? `Exploring: ${m[1].replace(/\).*/, '').trim()}` : 'Exploring codebase...'; + } else if (/Task\(/.test(stripped)) { + activity = 'Running sub-agent...'; + } else if (/Sublimating|Thinking|Coalescing|Cultivating/.test(stripped)) { + const m = stripped.match(/(\d+)s/); + activity = m ? `Thinking... (${m[1]}s)` : 'Thinking...'; } - } else if (/Edit\(/.test(stripped)) { - const m = stripped.match(/Edit\(\s*~?([^\s)"']{8,})/); - if (m) { - const base = path.basename(m[1]); - activity = base.length >= 3 ? `Editing ${base}` : 'Editing file...'; - } else { - activity = 'Editing file...'; + if (activity && this.lastActivity.get(name) !== activity) { + this.lastActivity.set(name, activity); + this.log(`[${shortName}] ${activity}`); } - } else if (/Bash\(/.test(stripped)) { - // Extract a short preview of the command - const m = stripped.match(/Bash\(\s*(.{1,40})/); - activity = m ? `Running: ${m[1].trim()}...` : 'Running command...'; - } else if (/Explore\(/.test(stripped)) { - const m = stripped.match(/Explore\(\s*(.{1,50})/); - activity = m ? `Exploring: ${m[1].replace(/\).*/, '').trim()}` : 'Exploring codebase...'; - } else if (/Task\(/.test(stripped)) { - activity = 'Running sub-agent...'; - } else if (/Sublimating|Thinking|Coalescing|Cultivating/.test(stripped)) { - const m = stripped.match(/(\d+)s/); - activity = m ? `Thinking... (${m[1]}s)` : 'Thinking...'; - } - if (activity && this.lastActivity.get(name) !== activity) { - this.lastActivity.set(name, activity); - this.log(`[${shortName}] ${activity}`); - } - }; + }) + ); // Wire relay event hooks for rich console logging - this.relay.onMessageReceived = (msg) => { - this.emit({ - type: 'broker:event', - runId, - event: { - kind: 'relay_inbound', - event_id: msg.eventId, - from: msg.from, - target: msg.to, - body: msg.text, - thread_id: msg.threadId, - } as BrokerEvent, - }); - const body = msg.text.length > 120 ? msg.text.slice(0, 117) + '...' : msg.text; - const fromShort = msg.from.replace(/-[a-f0-9]{6,}$/, ''); - const toShort = msg.to.replace(/-[a-f0-9]{6,}$/, ''); - this.log(`[msg] ${fromShort} → ${toShort}: ${body}`); - - if (this.channel && (msg.to === this.channel || msg.to === `#${this.channel}`)) { - const runtimeAgent = this.runtimeStepAgents.get(msg.from); - this.recordChannelEvidence(msg.text, { - sender: runtimeAgent?.logicalName ?? msg.from, - actor: msg.from, - role: runtimeAgent?.role, - target: msg.to, - origin: 'relay_message', - stepName: runtimeAgent?.stepName, + this.unsubRelayListeners.push( + this.relay.addListener('messageReceived', (msg) => { + this.emit({ + type: 'broker:event', + runId, + event: { + kind: 'relay_inbound', + event_id: msg.eventId, + from: msg.from, + target: msg.to, + body: msg.text, + thread_id: msg.threadId, + } as BrokerEvent, }); - } + const body = msg.text.length > 120 ? msg.text.slice(0, 117) + '...' : msg.text; + const fromShort = msg.from.replace(/-[a-f0-9]{6,}$/, ''); + const toShort = msg.to.replace(/-[a-f0-9]{6,}$/, ''); + this.log(`[msg] ${fromShort} → ${toShort}: ${body}`); + + if (this.channel && (msg.to === this.channel || msg.to === `#${this.channel}`)) { + const runtimeAgent = this.runtimeStepAgents.get(msg.from); + this.recordChannelEvidence(msg.text, { + sender: runtimeAgent?.logicalName ?? msg.from, + actor: msg.from, + role: runtimeAgent?.role, + target: msg.to, + origin: 'relay_message', + stepName: runtimeAgent?.stepName, + }); + } - const supervision = this.supervisedRuntimeAgents.get(msg.from); - if (supervision?.role === 'owner') { - this.recordStepToolSideEffect(supervision.stepName, { - type: 'owner_monitoring', - detail: `Owner messaged ${msg.to}: ${msg.text.slice(0, 120)}`, - raw: { to: msg.to, text: msg.text }, - }); - void this.trajectory?.ownerMonitoringEvent( - supervision.stepName, - supervision.logicalName, - `Messaged ${msg.to}: ${msg.text.slice(0, 120)}`, - { to: msg.to, text: msg.text } - ); - } - }; + const supervision = this.supervisedRuntimeAgents.get(msg.from); + if (supervision?.role === 'owner') { + this.recordStepToolSideEffect(supervision.stepName, { + type: 'owner_monitoring', + detail: `Owner messaged ${msg.to}: ${msg.text.slice(0, 120)}`, + raw: { to: msg.to, text: msg.text }, + }); + void this.trajectory?.ownerMonitoringEvent( + supervision.stepName, + supervision.logicalName, + `Messaged ${msg.to}: ${msg.text.slice(0, 120)}`, + { to: msg.to, text: msg.text } + ); + } + }) + ); - this.relay.onAgentSpawned = (agent) => { - this.emit({ - type: 'broker:event', - runId, - event: { - kind: 'agent_spawned', - name: agent.name, - runtime: agent.runtime, - } as BrokerEvent, - }); - // Skip agents already managed by step execution - if (!this.activeAgentHandles.has(agent.name)) { - this.log(`[spawned] ${agent.name} (${agent.runtime})`); - } - }; + this.unsubRelayListeners.push( + this.relay.addListener('agentSpawned', (agent) => { + this.emit({ + type: 'broker:event', + runId, + event: { + kind: 'agent_spawned', + name: agent.name, + runtime: agent.runtime, + } as BrokerEvent, + }); + // Skip agents already managed by step execution + if (!this.activeAgentHandles.has(agent.name)) { + this.log(`[spawned] ${agent.name} (${agent.runtime})`); + } + }) + ); - this.relay.onAgentReleased = (agent) => { - this.emit({ - type: 'broker:event', - runId, - event: { - kind: 'agent_released', - name: agent.name, - } as BrokerEvent, - }); - }; + this.unsubRelayListeners.push( + this.relay.addListener('agentReleased', (agent) => { + this.emit({ + type: 'broker:event', + runId, + event: { + kind: 'agent_released', + name: agent.name, + } as BrokerEvent, + }); + }) + ); - this.relay.onAgentExited = (agent) => { - this.emit({ - type: 'broker:event', - runId, - event: { - kind: 'agent_exited', - name: agent.name, - code: agent.exitCode, - signal: agent.exitSignal, - } as BrokerEvent, - }); - this.lastActivity.delete(agent.name); - this.lastIdleLog.delete(agent.name); - if (!this.activeAgentHandles.has(agent.name)) { - this.log(`[exited] ${agent.name} (code: ${agent.exitCode ?? '?'})`); - } - }; + this.unsubRelayListeners.push( + this.relay.addListener('agentExited', (agent) => { + this.emit({ + type: 'broker:event', + runId, + event: { + kind: 'agent_exited', + name: agent.name, + code: agent.exitCode, + signal: agent.exitSignal, + } as BrokerEvent, + }); + this.lastActivity.delete(agent.name); + this.lastIdleLog.delete(agent.name); + if (!this.activeAgentHandles.has(agent.name)) { + this.log(`[exited] ${agent.name} (code: ${agent.exitCode ?? '?'})`); + } + }) + ); - this.relay.onDeliveryUpdate = (event) => { - this.emit({ type: 'broker:event', runId, event }); - }; + this.unsubRelayListeners.push( + this.relay.addListener('deliveryUpdate', (event) => { + this.emit({ type: 'broker:event', runId, event }); + }) + ); - this.relay.onAgentIdle = ({ name, idleSecs }) => { - this.emit({ - type: 'broker:event', - runId, - event: { - kind: 'agent_idle', - name, - idle_secs: idleSecs, - } as BrokerEvent, - }); - // Only log at 30s multiples to avoid watchdog spam - const bucket = Math.floor(idleSecs / 30) * 30; - if (bucket >= 30 && this.lastIdleLog.get(name) !== bucket) { - this.lastIdleLog.set(name, bucket); - const shortName = name.replace(/-[a-f0-9]{6,}$/, ''); - this.log(`[idle] ${shortName} silent for ${bucket}s`); - } - }; + this.unsubRelayListeners.push( + this.relay.addListener('agentIdle', ({ name, idleSecs }) => { + this.emit({ + type: 'broker:event', + runId, + event: { + kind: 'agent_idle', + name, + idle_secs: idleSecs, + } as BrokerEvent, + }); + // Only log at 30s multiples to avoid watchdog spam + const bucket = Math.floor(idleSecs / 30) * 30; + if (bucket >= 30 && this.lastIdleLog.get(name) !== bucket) { + this.lastIdleLog.set(name, bucket); + const shortName = name.replace(/-[a-f0-9]{6,}$/, ''); + this.log(`[idle] ${shortName} silent for ${bucket}s`); + } + }) + ); this.relaycast = undefined; this.relaycastAgent = undefined; @@ -3451,16 +3466,15 @@ export class WorkflowRunner { this.unsubBrokerStderr?.(); this.unsubBrokerStderr = undefined; - // Null out relay event hooks to prevent leaks - if (this.relay) { - this.relay.onMessageReceived = null; - this.relay.onAgentSpawned = null; - this.relay.onAgentReleased = null; - this.relay.onAgentExited = null; - this.relay.onAgentIdle = null; - this.relay.onWorkerOutput = null; - this.relay.onDeliveryUpdate = null; + // Unsubscribe relay event listeners to prevent leaks + for (const off of this.unsubRelayListeners) { + try { + off(); + } catch { + /* ignore */ + } } + this.unsubRelayListeners = []; this.lastIdleLog.clear(); this.lastActivity.clear(); this.supervisedRuntimeAgents.clear(); diff --git a/scripts/spawn-reviewers.ts b/scripts/spawn-reviewers.ts index 984db6e47..09295a0c5 100644 --- a/scripts/spawn-reviewers.ts +++ b/scripts/spawn-reviewers.ts @@ -11,18 +11,18 @@ async function main() { }); // Listen for all events - relay.onDeliveryUpdate = (event) => { + relay.addListener('deliveryUpdate', (event) => { console.log(`[delivery] ${event.kind}:`, JSON.stringify(event)); - }; - relay.onMessageReceived = (msg) => { + }); + relay.addListener('messageReceived', (msg) => { console.log(`\n[MSG ${msg.from} -> ${msg.to}]: ${msg.text}\n`); - }; - relay.onAgentSpawned = (agent: Agent) => { + }); + relay.addListener('agentSpawned', (agent: Agent) => { console.log(`[spawned] ${agent.name}`); - }; - relay.onAgentExited = (agent: Agent, code?: number) => { - console.log(`[exited] ${agent.name} code=${code}`); - }; + }); + relay.addListener('agentExited', (agent: Agent) => { + console.log(`[exited] ${agent.name} code=${agent.exitCode ?? 'none'}`); + }); const human = relay.human({ name: 'Human' }); diff --git a/tests/workflows/run-e2e-owner-review.ts b/tests/workflows/run-e2e-owner-review.ts index 1cef2f109..70df9ec0b 100644 --- a/tests/workflows/run-e2e-owner-review.ts +++ b/tests/workflows/run-e2e-owner-review.ts @@ -21,7 +21,11 @@ import { fileURLToPath } from 'node:url'; // from the SDK directory, use the SDK vitest config which includes src/__tests__. // From repo root, the aliases resolve correctly. import type { WorkflowDb } from '../../packages/sdk/src/workflows/runner.js'; -import type { RelayYamlConfig, WorkflowRunRow, WorkflowStepRow } from '../../packages/sdk/src/workflows/types.js'; +import type { + RelayYamlConfig, + WorkflowRunRow, + WorkflowStepRow, +} from '../../packages/sdk/src/workflows/types.js'; // ── Mock fetch ────────────────────────────────────────────────────────────── @@ -72,8 +76,12 @@ let mockSpawnOutputs: string[] = []; const mockAgent = { name: 'test-agent-abc', - get waitForExit() { return waitForExitFn; }, - get waitForIdle() { return waitForIdleFn; }, + get waitForExit() { + return waitForExitFn; + }, + get waitForIdle() { + return waitForIdleFn; + }, release: vi.fn().mockResolvedValue(undefined), }; @@ -82,6 +90,12 @@ const mockHuman = { sendMessage: vi.fn().mockResolvedValue(undefined), }; +const mockListeners = new Map void>>(); +function emitMockEvent(event: string, ...args: any[]): void { + const set = mockListeners.get(event); + if (set) for (const cb of set) cb(...args); +} + const mockRelayInstance = { spawnPty: vi.fn().mockImplementation(async ({ name, task }: { name: string; task?: string }) => { const queued = mockSpawnOutputs.shift(); @@ -96,9 +110,7 @@ const mockRelayInstance = { : 'STEP_COMPLETE:unknown\n'); queueMicrotask(() => { - if (typeof mockRelayInstance.onWorkerOutput === 'function') { - mockRelayInstance.onWorkerOutput({ name, chunk: output }); - } + emitMockEvent('workerOutput', { name, chunk: output }); }); return { ...mockAgent, name }; @@ -106,11 +118,15 @@ const mockRelayInstance = { human: vi.fn().mockReturnValue(mockHuman), shutdown: vi.fn().mockResolvedValue(undefined), onBrokerStderr: vi.fn().mockReturnValue(() => {}), - onWorkerOutput: null as ((frame: { name: string; chunk: string }) => void) | null, - onMessageReceived: null as any, - onAgentSpawned: null as any, - onAgentExited: null as any, - onAgentIdle: null as any, + addListener: vi.fn((event: string, cb: (...args: any[]) => void) => { + let set = mockListeners.get(event); + if (!set) { + set = new Set(); + mockListeners.set(event, set); + } + set.add(cb); + return () => set!.delete(cb); + }), listAgentsRaw: vi.fn().mockResolvedValue([]), }; @@ -127,7 +143,9 @@ function makeDb(): WorkflowDb { const runs = new Map(); const steps = new Map(); return { - insertRun: vi.fn(async (run: WorkflowRunRow) => { runs.set(run.id, { ...run }); }), + insertRun: vi.fn(async (run: WorkflowRunRow) => { + runs.set(run.id, { ...run }); + }), updateRun: vi.fn(async (id: string, patch: Partial) => { const existing = runs.get(id); if (existing) runs.set(id, { ...existing, ...patch }); @@ -136,7 +154,9 @@ function makeDb(): WorkflowDb { const run = runs.get(id); return run ? { ...run } : null; }), - insertStep: vi.fn(async (step: WorkflowStepRow) => { steps.set(step.id, { ...step }); }), + insertStep: vi.fn(async (step: WorkflowStepRow) => { + steps.set(step.id, { ...step }); + }), updateStep: vi.fn(async (id: string, patch: Partial) => { const existing = steps.get(id); if (existing) steps.set(id, { ...existing, ...patch }); @@ -185,7 +205,7 @@ describe('PR #511 E2E: Auto Step Owner + Review Gating', () => { waitForExitFn = vi.fn().mockResolvedValue('exited'); waitForIdleFn = vi.fn().mockImplementation(() => never()); mockSpawnOutputs = []; - mockRelayInstance.onWorkerOutput = null; + mockListeners.clear(); db = makeDb(); runner = new WorkflowRunner({ db, workspaceId: 'ws-test' }); }); @@ -207,10 +227,12 @@ describe('PR #511 E2E: Auto Step Owner + Review Gating', () => { { name: 'team-lead', cli: 'claude', role: 'Lead coordinator for the workflow' }, { name: 'quality-reviewer', cli: 'claude', role: 'reviewer' }, ], - workflows: [{ - name: 'default', - steps: [{ name: 'hub-owner-test', agent: 'impl-worker', task: 'List 3 benefits' }], - }], + workflows: [ + { + name: 'default', + steps: [{ name: 'hub-owner-test', agent: 'impl-worker', task: 'List 3 benefits' }], + }, + ], }); const run = await runner.execute(config, 'default'); @@ -233,10 +255,12 @@ describe('PR #511 E2E: Auto Step Owner + Review Gating', () => { { name: 'lead-bot', cli: 'claude', role: 'lead' }, { name: 'reviewer-1', cli: 'claude', role: 'reviewer' }, ], - workflows: [{ - name: 'default', - steps: [{ name: 'step-1', agent: 'specialist', task: 'Do work' }], - }], + workflows: [ + { + name: 'default', + steps: [{ name: 'step-1', agent: 'specialist', task: 'Do work' }], + }, + ], }); const run = await runner.execute(config, 'default'); @@ -262,10 +286,12 @@ describe('PR #511 E2E: Auto Step Owner + Review Gating', () => { { name: 'github-integration', cli: 'claude', role: 'GitHub integration agent' }, { name: 'reviewer-1', cli: 'claude', role: 'reviewer' }, ], - workflows: [{ - name: 'default', - steps: [{ name: 'github-no-hub', agent: 'specialist', task: 'Test word boundary' }], - }], + workflows: [ + { + name: 'default', + steps: [{ name: 'github-no-hub', agent: 'specialist', task: 'Test word boundary' }], + }, + ], }); const run = await runner.execute(config, 'default'); @@ -289,10 +315,12 @@ describe('PR #511 E2E: Auto Step Owner + Review Gating', () => { { name: 'github-bot', cli: 'claude', role: 'github integration' }, { name: 'reviewer-1', cli: 'claude', role: 'reviewer' }, ], - workflows: [{ - name: 'default', - steps: [{ name: 'step-1', agent: 'specialist', task: 'Do work' }], - }], + workflows: [ + { + name: 'default', + steps: [{ name: 'step-1', agent: 'specialist', task: 'Do work' }], + }, + ], }); const run = await runner.execute(config, 'default'); @@ -360,10 +388,7 @@ describe('PR #511 E2E: Auto Step Owner + Review Gating', () => { }, 15000); it('should fail closed when review output is malformed (no REVIEW_DECISION)', async () => { - mockSpawnOutputs = [ - 'STEP_COMPLETE:step-1\n', - 'REVIEW_REASON: this is missing the decision line\n', - ]; + mockSpawnOutputs = ['STEP_COMPLETE:step-1\n', 'REVIEW_REASON: this is missing the decision line\n']; const run = await runner.execute(makeConfig(), 'default'); expect(run.status).toBe('failed'); @@ -395,10 +420,12 @@ describe('PR #511 E2E: Auto Step Owner + Review Gating', () => { describe('Scenario 5: Review timeout budgeting', () => { it('should not allocate review timeout longer than parent step timeout', async () => { const config = makeConfig({ - workflows: [{ - name: 'default', - steps: [{ name: 'step-1', agent: 'agent-a', task: 'Do step 1', timeoutMs: 30_000 }], - }], + workflows: [ + { + name: 'default', + steps: [{ name: 'step-1', agent: 'agent-a', task: 'Do step 1', timeoutMs: 30_000 }], + }, + ], }); const run = await runner.execute(config, 'default'); @@ -414,10 +441,12 @@ describe('PR #511 E2E: Auto Step Owner + Review Gating', () => { it('should use proportional timeout (1/3) for longer step timeouts', async () => { const config = makeConfig({ - workflows: [{ - name: 'default', - steps: [{ name: 'step-1', agent: 'agent-a', task: 'Do step 1', timeoutMs: 900_000 }], - }], + workflows: [ + { + name: 'default', + steps: [{ name: 'step-1', agent: 'agent-a', task: 'Do step 1', timeoutMs: 900_000 }], + }, + ], }); const run = await runner.execute(config, 'default'); @@ -433,10 +462,12 @@ describe('PR #511 E2E: Auto Step Owner + Review Gating', () => { it('should cap review timeout at 600s upper bound', async () => { const config = makeConfig({ - workflows: [{ - name: 'default', - steps: [{ name: 'step-1', agent: 'agent-a', task: 'Do step 1', timeoutMs: 3_600_000 }], - }], + workflows: [ + { + name: 'default', + steps: [{ name: 'step-1', agent: 'agent-a', task: 'Do step 1', timeoutMs: 3_600_000 }], + }, + ], }); const run = await runner.execute(config, 'default'); @@ -510,14 +541,21 @@ describe('PR #511 E2E: Auto Step Owner + Review Gating', () => { { name: 'worker-2', cli: 'claude', role: 'implementer' }, { name: 'reviewer-1', cli: 'claude', role: 'reviewer' }, ], - workflows: [{ - name: 'default', - steps: [ - { name: 'work-1', agent: 'worker-1', task: 'Do task A' }, - { name: 'work-2', agent: 'worker-2', task: 'Do task B' }, - { name: 'lead-coord', agent: 'team-lead', task: 'Coordinate workers', dependsOn: ['work-1', 'work-2'] }, - ], - }], + workflows: [ + { + name: 'default', + steps: [ + { name: 'work-1', agent: 'worker-1', task: 'Do task A' }, + { name: 'work-2', agent: 'worker-2', task: 'Do task B' }, + { + name: 'lead-coord', + agent: 'team-lead', + task: 'Coordinate workers', + dependsOn: ['work-1', 'work-2'], + }, + ], + }, + ], }); const run = await runner.execute(config, 'default'); diff --git a/web/content/docs/event-handlers.mdx b/web/content/docs/event-handlers.mdx index d68db1a8c..faac71819 100644 --- a/web/content/docs/event-handlers.mdx +++ b/web/content/docs/event-handlers.mdx @@ -1,25 +1,30 @@ --- title: 'Event handlers' -description: 'Subscribe to relay events so your app can react to messages, agent lifecycle changes, output, and delivery updates.' +description: 'Subscribe to relay events so your app can react to messages, agent lifecycle changes, spawn intercepts, output, and delivery updates.' --- -Event handlers are the main way to observe what the relay is doing in real time. Attach callbacks to the `AgentRelay` instance to log activity, update UI, trigger follow-up work, or keep your own state in sync. +Event handlers are the main way to observe what the relay is doing in real time. Register listeners on the `AgentRelay` instance to log activity, update UI, trigger follow-up work, or keep your own state in sync. ## Basic pattern -Assign a function to subscribe. Set the handler back to `null` / `None` to clear it. +Use `addListener(event, handler)` to subscribe — it returns an unsubscribe function. Multiple listeners can register for the same event; they fire sequentially in registration order. Async handlers are awaited. Handler exceptions are caught and logged so one bad listener never blocks the others. ```typescript TypeScript const relay = new AgentRelay(); -relay.onMessageReceived = (msg) => { +const off = relay.addListener('messageReceived', (msg) => { console.log(`${msg.from} -> ${msg.to}: ${msg.text}`); -}; +}); -relay.onAgentReady = (agent) => { +relay.addListener('agentReady', (agent) => { console.log(`ready: ${agent.name}`); -}; +}); + +// Later, when you want to detach: +off(); +// or: +relay.removeListener('messageReceived', myHandler); ``` ```python Python @@ -30,19 +35,19 @@ relay.on_agent_ready = lambda agent: print(f"ready: {agent.name}") ``` -## Message handlers +The TypeScript SDK switched to a multi-listener registry in 7.0; if you're upgrading from 6.x see the [migration block below](#migrating-from-6x). -Use these when you want to react to communication events. +## Message events ```typescript TypeScript -relay.onMessageReceived = (msg) => { +relay.addListener('messageReceived', (msg) => { console.log('received', msg.threadId, msg.text); -}; +}); -relay.onMessageSent = (msg) => { +relay.addListener('messageSent', (msg) => { console.log('sent', msg.to, msg.text); -}; +}); ``` ```python Python @@ -51,42 +56,40 @@ relay.on_message_sent = lambda msg: print("sent", msg.to, msg.text) ``` -- `onMessageReceived` / `on_message_received`: fires when the relay delivers a message into your session. -- `onMessageSent` / `on_message_sent`: fires after your app or agent sends a message through the relay. - -## Agent lifecycle handlers +- `messageReceived` / `on_message_received`: fires when the relay delivers a message into your session. +- `messageSent` / `on_message_sent`: fires after your app or agent sends a message through the relay. -Use lifecycle handlers when you care about worker state transitions. +## Agent lifecycle events ```typescript TypeScript -relay.onAgentSpawned = (agent) => { +relay.addListener('agentSpawned', (agent) => { console.log(`spawned: ${agent.name}`); -}; +}); -relay.onAgentReady = (agent) => { +relay.addListener('agentReady', (agent) => { console.log(`ready: ${agent.name}`); -}; +}); -relay.onAgentReleased = (agent) => { +relay.addListener('agentReleased', (agent) => { console.log(`released: ${agent.name}`); -}; +}); -relay.onAgentExited = (agent) => { +relay.addListener('agentExited', (agent) => { console.log(`exited: ${agent.name}`, agent.exitCode, agent.exitSignal); -}; +}); -relay.onAgentExitRequested = ({ name, reason }) => { +relay.addListener('agentExitRequested', ({ name, reason }) => { console.log(`exit requested: ${name}`, reason); -}; +}); -relay.onAgentIdle = ({ name, idleSecs }) => { +relay.addListener('agentIdle', ({ name, idleSecs }) => { console.log(`idle: ${name}`, idleSecs); -}; +}); -relay.onAgentActivityChanged = ({ name, active, pendingDeliveries, reason }) => { +relay.addListener('agentActivityChanged', ({ name, active, pendingDeliveries, reason }) => { console.log(`activity: ${name}`, { active, pendingDeliveries, reason }); -}; +}); ``` ```python Python @@ -99,21 +102,64 @@ relay.on_agent_idle = lambda event: print(f"idle: {event['name']}", event["idleS ``` -These hooks are useful for status dashboards, retry logic, timeout nudges, and cleanup when a worker finishes or requests exit. +These are useful for status dashboards, retry logic, timeout nudges, and cleanup when a worker finishes or requests exit. + +## Call-site spawn / release hooks + +Unlike the broker-event listeners above (which fire *after* the broker emits an event), the four `before*` / `after*` hooks fire at the SDK call site — before and after the HTTP request to the broker. They're the right place to integrate cost-tracking tools, audit logs, or anything that needs to observe (or modify) the spawn input. + +```typescript +import { randomUUID } from 'node:crypto'; + +relay.addListener('beforeAgentSpawn', (ctx) => { + console.log(`spawning ${ctx.input.name} via ${ctx.input.cli}`); +}); + +relay.addListener('afterAgentSpawn', (ctx) => { + if (ctx.error) { + console.error(`spawn ${ctx.input.name} failed in ${ctx.durationMs}ms`, ctx.error); + } else { + console.log(`spawn ${ctx.result?.name} resolved in ${ctx.durationMs}ms`); + } +}); + +relay.addListener('beforeAgentRelease', (ctx) => { + console.log(`releasing ${ctx.name}`, ctx.reason); +}); + +relay.addListener('afterAgentRelease', (ctx) => { + console.log(`released ${ctx.name} in ${ctx.durationMs}ms`); +}); +``` + +### Mutating the spawn input + +A `beforeAgentSpawn` handler can **return a `SpawnPatch`** to merge into the spawn input before it reaches the broker. Patches are a shallow merge over the resolved input; when multiple handlers return patches they apply in registration order (later wins on conflict). For array fields like `args` / `channels` the patch *replaces* the array — spread the previous value to extend: -## Output and delivery handlers +```typescript +relay.addListener('beforeAgentSpawn', (ctx) => { + if (ctx.input.cli !== 'claude') return; // observe-only for non-Claude + const sessionId = randomUUID(); + // Preallocate a session id so cost-tracking can stamp it without sidecar matching. + return { + args: [...(ctx.input.args ?? []), '--session-id', sessionId], + }; +}); +``` -Use these when you need low-level visibility into what the relay is streaming. +`afterAgentSpawn` exposes the post-patch `resolvedInput` so observers can see exactly what was sent. `beforeAgentRelease` / `afterAgentRelease` are observe-only. + +## Output and delivery events ```typescript TypeScript -relay.onWorkerOutput = ({ name, stream, chunk }) => { +relay.addListener('workerOutput', ({ name, stream, chunk }) => { console.log(`[${name}] ${stream}: ${chunk}`); -}; +}); -relay.onDeliveryUpdate = (event) => { - console.log('delivery update', event.type, event); -}; +relay.addListener('deliveryUpdate', (event) => { + console.log('delivery update', event.kind, event); +}); ``` ```python Python @@ -122,25 +168,64 @@ relay.on_delivery_update = lambda event: print("delivery update", event["type"]) ``` -- `onWorkerOutput` / `on_worker_output`: raw stdout or stderr from a worker. -- `onDeliveryUpdate` / `on_delivery_update`: broker-level delivery events when you need transport visibility. -- `onAgentActivityChanged`: high-level derived activity signal for UI badges like “thinking” without hand-rolling delivery event inference. +- `workerOutput` / `on_worker_output`: raw stdout or stderr from a worker. +- `deliveryUpdate` / `on_delivery_update`: broker-level delivery events when you need transport visibility. +- `agentActivityChanged`: high-level derived activity signal for UI badges like "thinking" without hand-rolling delivery event inference. ```typescript const thinkingByAgent = new Map(); -relay.onAgentActivityChanged = ({ name, active }) => { +relay.addListener('agentActivityChanged', ({ name, active }) => { thinkingByAgent.set(name, active); renderThinkingBadge(name, active); -}; +}); ``` -## Good uses for handlers +## Channel subscribe / unsubscribe events + +```typescript +relay.addListener('channelSubscribed', ({ agent, channels }) => { + console.log(`${agent} subscribed to ${channels.join(', ')}`); +}); + +relay.addListener('channelUnsubscribed', ({ agent, channels }) => { + console.log(`${agent} unsubscribed from ${channels.join(', ')}`); +}); +``` + +## Migrating from 6.x + +The 6.x TypeScript SDK exposed each event as a single nullable callback field: + +```ts +// 6.x style — no longer compiles +relay.onAgentSpawned = (agent) => log(agent.name); +relay.onMessageReceived = null; +``` + +In 7.0 every event is on a typed multi-listener registry: + +```ts +// 7.0 style +const off = relay.addListener('agentSpawned', (agent) => log(agent.name)); +off(); // unsubscribe when done +``` + +Migration rules: + +- `relay.onXxx = handler;` → `relay.addListener('xxx', handler);` (lower-case first letter of the suffix). +- `relay.onXxx = null;` → either call the unsubscribe function returned from `addListener`, or use `relay.removeListener('xxx', handler)`. +- `relay.onChannelSubscribed = (agent, channels) => ...` and `relay.onChannelUnsubscribed = ...` now receive a single `{ agent, channels }` object instead of positional args. + +Per-call option callbacks like `spawnPersona({ onStart, onSuccess, onError })` are unchanged — those are scoped to a single invocation, not global hooks. + +## Good uses for listeners - Update a UI timeline or status panel without polling. - Capture logs and metrics while agents run. - Trigger follow-up work when a message arrives or a worker becomes ready. - Detect exit conditions and clean up local resources. +- Integrate cost-tracking or audit tools by pre-stamping spawns from `beforeAgentSpawn`. ## See also @@ -152,7 +237,7 @@ relay.onAgentActivityChanged = ({ name, active }) => { Start workers whose lifecycle you want to observe. - Full TypeScript API reference, including hook names and types. + Full TypeScript API reference, including event names and payload types. Full Python API reference for the same callbacks.