From fc11bf3f9d0616df6713aedf0f465f79e2bb1268 Mon Sep 17 00:00:00 2001 From: kjgbot Date: Sat, 6 Jun 2026 14:40:54 +0200 Subject: [PATCH 1/6] Fix Slack integration event context preview --- .../integration-event-bridge.test.ts | 88 +++++++++++++++++-- src/main/integration-event-bridge.ts | 33 +++++-- 2 files changed, 109 insertions(+), 12 deletions(-) diff --git a/src/main/__tests__/integration-event-bridge.test.ts b/src/main/__tests__/integration-event-bridge.test.ts index 1f1a59b..f8fc884 100644 --- a/src/main/__tests__/integration-event-bridge.test.ts +++ b/src/main/__tests__/integration-event-bridge.test.ts @@ -158,6 +158,7 @@ function makeHarness( content: string encoding: 'utf-8' | 'base64' } + readFileFailuresBeforeSuccess?: number failReadFile?: boolean sendDelayMs?: number onSendStart?: (activeSends: number) => void @@ -181,6 +182,7 @@ function makeHarness( const subscriptions: Subscription[] = [] let unsubscribedCount = 0 let activeSends = 0 + let readFileAttempts = 0 const bridge = new IntegrationEventBridge({ getWorkspaceHandle: async () => ({ @@ -195,6 +197,10 @@ function makeHarness( }, async readFile(workspaceId, path) { readFileCalls.push({ workspaceId, path }) + readFileAttempts += 1 + if (options.readFileFailuresBeforeSuccess && readFileAttempts <= options.readFileFailuresBeforeSuccess) { + throw new Error('remote file not ready') + } if (options.failReadFile) throw new Error('remote file not ready') return options.readFileResponse?.(workspaceId, path) ?? { path, @@ -920,21 +926,89 @@ test('slack context falls back to expanded event data when targeted remote previ } }) } as ChangeEvent) - await waitForSent(harness, 1) + await waitForSent(harness, 1, 2_500) assert.match(harness.sent[0].input.text, /Slack message event/u) assert.match(harness.sent[0].input.text, /Author: Khaliq/u) assert.match(harness.sent[0].input.text, /Message:\nexpanded Slack context/u) - assert.deepEqual(harness.readFileCalls, [ - { - workspaceId: 'workspace-id', - path: messagePath - } - ]) + assert.equal(harness.readFileCalls.length, 4) + assert.deepEqual(harness.readFileCalls[0], { + workspaceId: 'workspace-id', + path: messagePath + }) assert.equal((harness.sent[0].input.data?.contextPreview as { kind?: string } | undefined)?.kind, 'text') assert.equal((harness.sent[0].input.data?.contextPreview as { content?: string } | undefined)?.content, undefined) }) +test('slack context retries targeted remote preview before falling back to sparse event data', async () => { + const harness = makeHarness(['alice'], { readFileFailuresBeforeSuccess: 1 }) + const messagePath = '/slack/channels/D123ABC/messages/1780668000_000000/meta.json' + + await withMockedNow('2026-06-05T14:00:00.000Z', async () => { + await harness.bridge.reconcile('project-1', [ + integration({ + provider: 'slack', + integrationId: 'slack-1', + mountPaths: ['/slack/channels/C123ABC__proj-cloud'], + downloadHistoricalData: false, + scope: { notifyAgents: ['alice'] } + }) + ]) + }) + + await harness.emit({ + ...changeEvent(messagePath, 'slack'), + expand: async () => ({ + level: 'full', + path: messagePath, + data: { + path: messagePath, + deleted: false + } + }) + } as ChangeEvent) + await waitForSent(harness, 1) + + assert.equal(harness.readFileCalls.length, 2) + assert.match(harness.sent[0].input.text, /Slack message event/u) + assert.match(harness.sent[0].input.text, /Message:\ntargeted Slack context/u) + assert.doesNotMatch(harness.sent[0].input.text, /"deleted": false/u) +}) + +test('slack context does not inject sparse relayfile pointer fallback as message content', async () => { + const harness = makeHarness(['alice'], { failReadFile: true }) + const messagePath = '/slack/channels/D123ABC/messages/1780668000_000000/meta.json' + + await withMockedNow('2026-06-05T14:00:00.000Z', async () => { + await harness.bridge.reconcile('project-1', [ + integration({ + provider: 'slack', + integrationId: 'slack-1', + mountPaths: ['/slack/channels/C123ABC__proj-cloud'], + downloadHistoricalData: false, + scope: { notifyAgents: ['alice'] } + }) + ]) + }) + + await harness.emit({ + ...changeEvent(messagePath, 'slack'), + expand: async () => ({ + level: 'full', + path: messagePath, + data: { + path: messagePath, + deleted: false + } + }) + } as ChangeEvent) + await waitForSent(harness, 1, 2_500) + + assert.match(harness.sent[0].input.text, /Message: unavailable; targeted context read did not return content\./u) + assert.doesNotMatch(harness.sent[0].input.text, /"path":/u) + assert.doesNotMatch(harness.sent[0].input.text, /"deleted": false/u) +}) + test('integration event targeted context previews skip binary files', async () => { const harness = makeHarness(['alice'], { readFileResponse: (_workspaceId, path) => ({ diff --git a/src/main/integration-event-bridge.ts b/src/main/integration-event-bridge.ts index 829423c..6082877 100644 --- a/src/main/integration-event-bridge.ts +++ b/src/main/integration-event-bridge.ts @@ -44,6 +44,7 @@ const SLACK_DM_EVENT_GLOBS = [ '/slack/users/*/messages/**' ] const MAX_EVENT_CONTEXT_PREVIEW_BYTES = 32 * 1024 +const EVENT_CONTEXT_READ_RETRY_DELAYS_MS = [150, 350, 750] const MAX_DISPATCH_QUEUE_EVENTS = 50 const MAX_DISPATCH_SUMMARY_GROUPS = 10 const MAX_DISPATCHED_EVENTS_PER_SECOND = 25 @@ -325,6 +326,10 @@ function isUnauthorizedError(error: unknown): boolean { return typeof message === 'string' && /\b(401|403|unauthor|forbidden)\b/i.test(message) } +function delay(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)) +} + function isRecord(value: unknown): value is Record { return !!value && typeof value === 'object' && !Array.isArray(value) } @@ -1542,10 +1547,21 @@ function eventContextPreviewFromBuffer(path: string, buffer: Buffer, contentType function eventContextPreviewFromData(path: string, data: unknown): EventContextPreview | undefined { if (data === undefined || data === null) return undefined + if (isSparseRelayfilePointerData(path, data)) return undefined const content = typeof data === 'string' ? data : JSON.stringify(data, null, 2) return eventContextPreviewFromBuffer(path, Buffer.from(content, 'utf8'), 'application/json') } +function isSparseRelayfilePointerData(path: string, data: unknown): boolean { + if (!isRecord(data)) return false + const keys = Object.keys(data).sort() + if (keys.length === 1 && keys[0] === 'path') return data.path === path + if (keys.length === 2 && keys[0] === 'deleted' && keys[1] === 'path') { + return data.path === path && typeof data.deleted === 'boolean' + } + return false +} + function eventContextPreviewMetadata(preview: EventContextPreview): EventContextPreviewMetadata { return { path: preview.path, @@ -1691,14 +1707,11 @@ function formatSlackIntegrationEventMessage( const author = slackPreviewAuthor(contextPreview) const lines = [ '', - 'Slack message event', - `Event id: ${event.id}`, - `Occurred at: ${event.occurredAt}` + 'Slack message event' ] if (scopeLabel) lines.push(`Location: ${scopeLabel}`) if (author) lines.push(`Author: ${author}`) - lines.push(`Path: ${projectPath}`) if (messageText) { lines.push('Message:', messageText) } else if (contextPreview?.kind === 'too-large') { @@ -1708,6 +1721,7 @@ function formatSlackIntegrationEventMessage( } else { lines.push('Message: unavailable; targeted context read did not return content.') } + lines.push(`Path: ${projectPath}`) lines.push('') return lines.join('\n') } @@ -2233,10 +2247,19 @@ export class IntegrationEventBridge { let readFileError: unknown try { + const readDelays = slackEventContextPath(path) ? [0, ...EVENT_CONTEXT_READ_RETRY_DELAYS_MS] : [0] const handle = await this.getWorkspaceHandle() const client = handle.client() if (typeof client.readFile === 'function') { - return eventContextPreviewFromFile(await client.readFile(handle.workspaceId, path)) + for (const [index, delayMs] of readDelays.entries()) { + if (delayMs > 0) await delay(delayMs) + try { + return eventContextPreviewFromFile(await client.readFile(handle.workspaceId, path)) + } catch (error) { + readFileError = error + if (index === readDelays.length - 1) break + } + } } } catch (error) { readFileError = error From 10b7203b82fe1c39ab0a945d723125d8262407a4 Mon Sep 17 00:00:00 2001 From: kjgbot Date: Sat, 6 Jun 2026 14:45:25 +0200 Subject: [PATCH 2/6] Harden Slack integration alias dedupe --- .../integration-event-bridge.test.ts | 54 ++++++++++++++++++ src/main/integration-event-bridge.ts | 57 +++++++++++++++---- 2 files changed, 101 insertions(+), 10 deletions(-) diff --git a/src/main/__tests__/integration-event-bridge.test.ts b/src/main/__tests__/integration-event-bridge.test.ts index f8fc884..5a30d45 100644 --- a/src/main/__tests__/integration-event-bridge.test.ts +++ b/src/main/__tests__/integration-event-bridge.test.ts @@ -711,6 +711,60 @@ test('slack raw-id and slug alias paths with distinct revisions inject once per assert.deepEqual(harness.sent.map((message) => message.input.to), ['alice', 'alice', 'alice']) }) +test('slack raw-id and slug alias duplicates suppress when one context read is sparse', async () => { + const harness = makeHarness(['alice'], { + readFileResponse: (_workspaceId, path) => { + if (path.includes('__proj-cloud')) throw new Error('remote file not ready') + return { + path, + revision: 'rev-context', + contentType: 'application/json', + content: JSON.stringify({ provider: 'slack', text: 'readable Slack message' }), + encoding: 'utf-8' + } + } + }) + + await withMockedNow('2026-06-05T14:00:00.000Z', async () => { + await harness.bridge.reconcile('project-1', [ + integration({ + provider: 'slack', + integrationId: 'slack-1', + mountPaths: ['/slack/channels/C123ABC__proj-cloud'], + downloadHistoricalData: false, + scope: { notifyAgents: ['alice'] } + }) + ]) + }) + + await harness.emit(changeEvent( + '/slack/channels/C123ABC/messages/1780668000_000000/meta.json', + 'slack', + { digest: 'revision:raw-copy' } + )) + await waitForSent(harness, 1) + + await harness.emit({ + ...changeEvent( + '/slack/channels/C123ABC__proj-cloud/messages/1780668000_000000/meta.json', + 'slack', + { digest: 'revision:slug-copy' } + ), + expand: async () => ({ + level: 'full', + path: '/slack/channels/C123ABC__proj-cloud/messages/1780668000_000000/meta.json', + data: { + path: '/slack/channels/C123ABC__proj-cloud/messages/1780668000_000000/meta.json', + deleted: false + } + }) + } as ChangeEvent) + await waitForDropped('project-1', 1, 2_500) + + assert.equal(harness.sent.length, 1) + assert.match(harness.sent[0].input.text, /Message:\nreadable Slack message/u) +}) + test('remote replayed events older than the subscription session are dropped by default', async () => { const harness = makeHarness() diff --git a/src/main/integration-event-bridge.ts b/src/main/integration-event-bridge.ts index 6082877..01d491c 100644 --- a/src/main/integration-event-bridge.ts +++ b/src/main/integration-event-bridge.ts @@ -97,6 +97,11 @@ type EventContextPreview = { type EventContextPreviewMetadata = Omit +type SlackLogicalInjectionState = { + expiresAt: number + contentHash?: string +} + type DispatchItem = { event: ChangeEvent specs: SubscriptionSpec[] @@ -1295,8 +1300,7 @@ function stableContentFingerprint(content: string): string { function eventDedupeKeyWithFingerprint( duplicateKey: string, - fingerprint: string | null, - contextPreview?: EventContextPreview + fingerprint: string | null ): { key: string; ttlMs: number } { if (!fingerprint) { return { @@ -1306,11 +1310,8 @@ function eventDedupeKeyWithFingerprint( } if (fingerprint.startsWith('slack:')) { - const contentAwareFingerprint = contextPreview?.kind === 'text' - ? `${fingerprint}:content:${stableContentFingerprint(contextPreview.content)}` - : fingerprint return { - key: `${duplicateKey}:change:${contentAwareFingerprint}`, + key: `${duplicateKey}:change:${fingerprint}`, ttlMs: SLACK_RECORD_REPLAY_TTL_MS } } @@ -2041,6 +2042,7 @@ export class IntegrationEventBridge { private subscriptions = new Map() private dispatchers = new Map() private recentInjections = new Map() + private slackLogicalInjections = new Map() private projectAgentRecipientCache = new Map() private notificationTargetCache = new Map() private brokerSendPacers = new Map() @@ -2369,7 +2371,7 @@ export class IntegrationEventBridge { // Release the dedupe key: a duplicate of this event (remote copy of a // local change, coalesced update) must be allowed to retry once a // recipient registers; otherwise the event is suppressed for the TTL. - if (dedupeClaimed) this.recentInjections.delete(dedupe.key) + if (dedupeClaimed) this.releaseDedupeKey(dedupe.key, needsSlackContentAwareDedupe) incrementIntegrationEventCounter(projectId, 'eventsDropped') warnIntegrationEventAggregated( `skipped no recipients:${projectId}`, @@ -2386,8 +2388,8 @@ export class IntegrationEventBridge { const eventMetadata = integrationEventMetadata(event) const contextPreview = await this.readEventContextPreview(projectId, event) if (needsSlackContentAwareDedupe) { - dedupe = eventDedupeKeyWithFingerprint(duplicateKey, fingerprint, contextPreview) - if (this.wasRecentlyInjected(dedupe.key, dedupe.ttlMs)) { + dedupe = eventDedupeKeyWithFingerprint(duplicateKey, fingerprint) + if (!this.claimSlackLogicalInjection(dedupe.key, contextPreview, dedupe.ttlMs)) { incrementIntegrationEventCounter(projectId, 'eventsDropped') logIntegrationEvent('skipped duplicate path', { projectId, @@ -2438,7 +2440,7 @@ export class IntegrationEventBridge { // No recipient got the event. Release the dedupe key so a duplicate of // this event (remote copy of a local change, coalesced update) retries // delivery instead of being dropped as a recent injection. - if (dedupeClaimed) this.recentInjections.delete(dedupe.key) + if (dedupeClaimed) this.releaseDedupeKey(dedupe.key, needsSlackContentAwareDedupe) throw sendErrors[0].error } if (sendErrors.length > 0) { @@ -2564,6 +2566,41 @@ export class IntegrationEventBridge { return false } + private claimSlackLogicalInjection( + key: string, + contextPreview: EventContextPreview | undefined, + ttlMs: number + ): boolean { + const now = Date.now() + for (const [entryKey, entry] of this.slackLogicalInjections.entries()) { + if (entry.expiresAt <= now) this.slackLogicalInjections.delete(entryKey) + } + + const contentHash = contextPreview?.kind === 'text' + ? stableContentFingerprint(contextPreview.content) + : undefined + const existing = this.slackLogicalInjections.get(key) + if (existing) { + if (!contentHash || !existing.contentHash || existing.contentHash === contentHash) { + return false + } + } + + this.slackLogicalInjections.set(key, { + expiresAt: now + ttlMs, + contentHash + }) + return true + } + + private releaseDedupeKey(key: string, isSlackLogicalKey: boolean): void { + if (isSlackLogicalKey) { + this.slackLogicalInjections.delete(key) + } else { + this.recentInjections.delete(key) + } + } + private async getWorkspaceHandle(): Promise { if (this.deps.getWorkspaceHandle) return this.deps.getWorkspaceHandle() const { accountWorkspaceReadyRetryOptions, getAccountWorkspaceId, refreshCloudAuth, resolveCloudAuth } = await import('./auth.ts') From 0cfc2039f7511c29a6cc0220ae810ac0275ae836 Mon Sep 17 00:00:00 2001 From: kjgbot Date: Sat, 6 Jun 2026 14:51:20 +0200 Subject: [PATCH 3/6] Let blind Slack alias claims learn content hashes for edit dedupe MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A blind Slack logical claim (context read returned nothing) suppressed the late content-bearing alias copy without recording its hash, so every genuine edit within the 1h replay TTL was also suppressed. The claim now learns the content hash from the suppressed copy (expiry unchanged), letting differing content inject again. Also gives waitForDropped a timeout parameter the sparse-read suppression test already relied on, and covers blind-first → suppressed copy → edit-injects ordering. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../integration-event-bridge.test.ts | 74 ++++++++++++++++++- src/main/integration-event-bridge.ts | 9 ++- 2 files changed, 80 insertions(+), 3 deletions(-) diff --git a/src/main/__tests__/integration-event-bridge.test.ts b/src/main/__tests__/integration-event-bridge.test.ts index 5a30d45..2872c88 100644 --- a/src/main/__tests__/integration-event-bridge.test.ts +++ b/src/main/__tests__/integration-event-bridge.test.ts @@ -456,8 +456,8 @@ async function waitForSent(harness: { sent: SentMessage[] }, count: number, time } } -async function waitForDropped(projectId: string, count: number): Promise { - const deadline = Date.now() + 1_000 +async function waitForDropped(projectId: string, count: number, timeoutMs = 1_000): Promise { + const deadline = Date.now() + timeoutMs while ((getIntegrationEventTelemetrySnapshot().projects[projectId]?.eventsDropped || 0) < count && Date.now() < deadline) { await new Promise((resolve) => setTimeout(resolve, 10)) } @@ -765,6 +765,76 @@ test('slack raw-id and slug alias duplicates suppress when one context read is s assert.match(harness.sent[0].input.text, /Message:\nreadable Slack message/u) }) +test('slack edits after a blind alias claim still inject once the content changes', async () => { + let messageText = 'original Slack message' + const harness = makeHarness(['alice'], { + readFileResponse: (_workspaceId, path) => { + if (!path.includes('__proj-cloud')) throw new Error('remote file not ready') + return { + path, + revision: 'rev-context', + contentType: 'application/json', + content: JSON.stringify({ provider: 'slack', text: messageText }), + encoding: 'utf-8' + } + } + }) + + await withMockedNow('2026-06-05T14:00:00.000Z', async () => { + await harness.bridge.reconcile('project-1', [ + integration({ + provider: 'slack', + integrationId: 'slack-1', + mountPaths: ['/slack/channels/C123ABC__proj-cloud'], + downloadHistoricalData: false, + scope: { notifyAgents: ['alice'] } + }) + ]) + }) + + // Raw-id copy first: every targeted read fails and the expanded event only + // carries the sparse relayfile pointer, so the injection is blind. + await harness.emit({ + ...changeEvent( + '/slack/channels/C123ABC/messages/1780668000_000000/meta.json', + 'slack', + { digest: 'revision:raw-copy' } + ), + expand: async () => ({ + level: 'full', + path: '/slack/channels/C123ABC/messages/1780668000_000000/meta.json', + data: { + path: '/slack/channels/C123ABC/messages/1780668000_000000/meta.json', + deleted: false + } + }) + } as ChangeEvent) + await waitForSent(harness, 1, 2_500) + assert.equal(harness.sent.length, 1) + assert.match(harness.sent[0].input.text, /Message: unavailable; targeted context read did not return content\./u) + + // The slug alias copy of the same record carries content: suppressed as a + // duplicate, but the claim learns the content hash. + await harness.emit(changeEvent( + '/slack/channels/C123ABC__proj-cloud/messages/1780668000_000000/meta.json', + 'slack', + { digest: 'revision:slug-copy' } + )) + await waitForDropped('project-1', 1, 2_500) + assert.equal(harness.sent.length, 1) + + // A genuine edit changes the content hash and must inject again. + messageText = 'edited Slack message' + await harness.emit(changeEvent( + '/slack/channels/C123ABC__proj-cloud/messages/1780668000_000000/meta.json', + 'slack', + { digest: 'revision:slug-edit' } + )) + await waitForSent(harness, 2) + assert.equal(harness.sent.length, 2) + assert.match(harness.sent[1].input.text, /Message:\nedited Slack message/u) +}) + test('remote replayed events older than the subscription session are dropped by default', async () => { const harness = makeHarness() diff --git a/src/main/integration-event-bridge.ts b/src/main/integration-event-bridge.ts index 01d491c..256142e 100644 --- a/src/main/integration-event-bridge.ts +++ b/src/main/integration-event-bridge.ts @@ -2581,7 +2581,14 @@ export class IntegrationEventBridge { : undefined const existing = this.slackLogicalInjections.get(key) if (existing) { - if (!contentHash || !existing.contentHash || existing.contentHash === contentHash) { + if (!existing.contentHash && contentHash) { + // A blind claim (context read returned nothing) suppresses the late + // content-bearing alias copy, but must learn its hash so a genuine + // edit afterwards still injects instead of matching the blind claim. + existing.contentHash = contentHash + return false + } + if (!contentHash || existing.contentHash === contentHash) { return false } } From 435d78c2d90689891e771e607561f1f76fb85e44 Mon Sep 17 00:00:00 2001 From: kjgbot Date: Sat, 6 Jun 2026 14:59:54 +0200 Subject: [PATCH 4/6] Thread relay workspace key and broker name into cloud provisioning Cloud sandbox brokers previously always created their own relay workspace, isolating cloud agents from the project's local agents (#125). The local broker remains the workspace creator; its workspace_key is now exposed via brokerManager.workspaceKeyForProject() and sent best-effort on the provisioning POST /box together with a stable, pear-assigned broker instance name (cloud-), which cloud injects verbatim as AGENT_RELAY_WORKSPACE_KEY / AGENT_RELAY_BROKER_NAME. Identity is provision-time only: GET/PATCH bodies are unchanged. Local spawns honor an explicit AGENT_RELAY_WORKSPACE_KEY pin through a single typed seam until @agent-relay/harness-driver publishes workspaceKey in RuntimeSpawnOptions. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/main/broker.test.ts | 42 ++++++++++++++++++++++++ src/main/broker.ts | 29 ++++++++++++++++- src/main/cloud-agent.test.ts | 63 +++++++++++++++++++++++++++++++----- src/main/cloud-agent.ts | 23 +++++++++++-- 4 files changed, 145 insertions(+), 12 deletions(-) diff --git a/src/main/broker.test.ts b/src/main/broker.test.ts index 42afcc9..13c3763 100644 --- a/src/main/broker.test.ts +++ b/src/main/broker.test.ts @@ -375,6 +375,48 @@ describe('BrokerManager local + cloud coexistence', () => { await manager.shutdown() }) + it('passes an explicit workspace key env pin to local broker spawn options', async () => { + const previousWorkspaceKey = process.env.AGENT_RELAY_WORKSPACE_KEY + process.env.AGENT_RELAY_WORKSPACE_KEY = 'rk_live_pinned' + const manager = new BrokerManager() + + try { + await manager.start(PROJECT_ID, '/tmp/project-1', 'pear-project-1', undefined as never, []) + + expect(mock.HarnessDriverClient.spawn).toHaveBeenCalledWith(expect.objectContaining({ + brokerName: 'pear-project-1', + workspaceKey: 'rk_live_pinned' + })) + } finally { + if (previousWorkspaceKey === undefined) { + delete process.env.AGENT_RELAY_WORKSPACE_KEY + } else { + process.env.AGENT_RELAY_WORKSPACE_KEY = previousWorkspaceKey + } + await manager.shutdown() + } + }) + + it('reads the local broker workspace key for cloud provisioning', async () => { + const manager = new BrokerManager() + const local = await startLocal(manager) + local.getSession.mockResolvedValueOnce({ workspace_key: 'rk_live_project' }) + + await expect(manager.workspaceKeyForProject(PROJECT_ID)).resolves.toBe('rk_live_project') + + await manager.shutdown() + }) + + it('omits the project workspace key when no local broker exposes one', async () => { + const manager = new BrokerManager() + await startLocal(manager) + + await expect(manager.workspaceKeyForProject(PROJECT_ID)).resolves.toBeUndefined() + await expect(manager.workspaceKeyForProject('missing-project')).resolves.toBeUndefined() + + await manager.shutdown() + }) + it('reuses current harness-driver connection files instead of spawning another broker', async () => { const tempDir = await mkdtemp(join(tmpdir(), 'pear-current-connection-')) const connectionPath = join(tempDir, '.agentworkforce', 'relay', 'connection.json') diff --git a/src/main/broker.ts b/src/main/broker.ts index 9797cd4..71db34a 100644 --- a/src/main/broker.ts +++ b/src/main/broker.ts @@ -1304,12 +1304,18 @@ export class BrokerManager { console.warn('[broker] Agent Relay MCP command could not be resolved; broker will use its default MCP command') } - const opts: AgentRelaySpawnOptions = { + // Phase 1 of #125: the local broker stays the workspace creator, so the + // key is only threaded when explicitly pinned via env. The intersection + // type is the single cast site until @agent-relay/harness-driver ships + // workspaceKey in RuntimeSpawnOptions (T3) — drop it once published. + const explicitWorkspaceKey = process.env.AGENT_RELAY_WORKSPACE_KEY?.trim() || undefined + const opts: AgentRelaySpawnOptions & { workspaceKey?: string } = { cwd, brokerName: name, channels: nextChannels, binaryArgs: { persist: true }, binaryPath: resolveBundledBrokerBinary(), + ...(explicitWorkspaceKey ? { workspaceKey: explicitWorkspaceKey } : {}), env: { PATH: augmentedPath(), ...(agentRelayMcpCommand ? { AGENT_RELAY_MCP_COMMAND: agentRelayMcpCommand } : {}) @@ -1459,6 +1465,27 @@ export class BrokerManager { return { removed } } + /** + * The local broker creates the project's relay workspace; its workspace_key + * is what cloud sandbox brokers must join so local and cloud agents share + * one workspace (#125). Non-throwing: resolves undefined until a local + * session exists and exposes a key, so provisioning can proceed without it. + */ + async workspaceKeyForProject(projectId: string): Promise { + const normalizedProjectId = projectId.trim() + if (!normalizedProjectId) return undefined + const startPromise = this.startPromises.get(normalizedProjectId) + if (startPromise) await startPromise.catch(() => undefined) + const session = this.sessions.get(normalizedProjectId) + if (!session) return undefined + try { + const metadata = await session.client.getSession() + return metadata.workspace_key || undefined + } catch { + return undefined + } + } + /** * Attach to an already-provisioned cloud sandbox (used by CloudAgentManager * which warms the box via the cloud-agents/{id}/box endpoint). connectCloud diff --git a/src/main/cloud-agent.test.ts b/src/main/cloud-agent.test.ts index acab282..ababb61 100644 --- a/src/main/cloud-agent.test.ts +++ b/src/main/cloud-agent.test.ts @@ -65,7 +65,8 @@ const mock = vi.hoisted(() => { brokerManager: { onBrokerEvent: vi.fn(), attachCloudSandbox: vi.fn(async () => undefined), - detachCloudSandbox: vi.fn(async () => undefined) + detachCloudSandbox: vi.fn(async () => undefined), + workspaceKeyForProject: vi.fn(async () => undefined) }, fetch: vi.fn(async (url: string | URL | Request, init?: RequestInit) => { const normalizedUrl = String(url) @@ -221,6 +222,8 @@ describe('CloudAgentManager', () => { mock.brokerManager.onBrokerEvent.mockClear() mock.brokerManager.attachCloudSandbox.mockClear() mock.brokerManager.detachCloudSandbox.mockClear() + mock.brokerManager.workspaceKeyForProject.mockClear() + mock.brokerManager.workspaceKeyForProject.mockResolvedValue(undefined) mock.loadStore.mockClear() mock.saveStore.mockClear() mock.resolveCloudAuth.mockClear() @@ -261,6 +264,26 @@ describe('CloudAgentManager', () => { await Promise.resolve() } + function boxRequest(method: string): { url: string; init?: RequestInit } | undefined { + return mock.fetchCalls.find((call) => + call.init?.method === method && + call.url.includes('/cloud-agents/cloud-agent-1/box') + ) + } + + function boxRequestBody(method: string): Record { + const request = boxRequest(method) + if (!request?.init?.body) throw new Error(`missing ${method} box request body`) + return JSON.parse(String(request.init.body)) as Record + } + + function expectBoxPostBody(expected: Record): void { + expect(boxRequestBody('POST')).toEqual({ + brokerName: 'cloud-cloud-ag', + ...expected + }) + } + it('keeps a newly created cloud agent visible while the cloud list catches up', async () => { const manager = new CloudAgentManager() @@ -302,14 +325,28 @@ describe('CloudAgentManager', () => { expect(boxPost?.url).toBe( 'https://cloud.example/api/v1/workspaces/account-workspace-id/cloud-agents/cloud-agent-1/box?async=true' ) - expect(JSON.parse(String(boxPost?.init?.body))).toEqual({ + expectBoxPostBody({ relayfileMountPaths: ['/integrations/github', '/workspace'] }) + expect(boxRequestBody('POST')).not.toHaveProperty('workspaceKey') expect(boxPost?.url).not.toContain('relay-workspace-id') expect(mock.fetchCalls.filter((call) => call.url.endsWith('/api/v1/auth/whoami'))).toHaveLength(1) expect(mock.mountInputs[0]?.workspaceId).toBe('relay-workspace-id') }) + it('passes the local relay workspace key and stable cloud broker name when warming a box', async () => { + mock.brokerManager.workspaceKeyForProject.mockResolvedValueOnce('rk_live_project') + const manager = new CloudAgentManager() + + await manager.attach('project-1', 'cloud-agent-1') + + expect(mock.brokerManager.workspaceKeyForProject).toHaveBeenCalledWith('project-1') + expectBoxPostBody({ + relayfileMountPaths: ['/integrations/github', '/workspace'], + workspaceKey: 'rk_live_project' + }) + }) + it('reuses a warm-on-intent box when attach is clicked', async () => { const manager = new CloudAgentManager() @@ -373,8 +410,7 @@ describe('CloudAgentManager', () => { await manager.attach('project-1', 'cloud-agent-1') - const boxPost = mock.fetchCalls.find((call) => call.init?.method === 'POST') - expect(JSON.parse(String(boxPost?.init?.body))).toEqual({ + expectBoxPostBody({ relayfileMountPaths: ['/integrations/github', '/workspace'], workspaceSource: expect.objectContaining({ kind: 'git-overlay', @@ -428,8 +464,7 @@ describe('CloudAgentManager', () => { await manager.attach('project-1', 'cloud-agent-1') - const boxPost = mock.fetchCalls.find((call) => call.init?.method === 'POST') - expect(JSON.parse(String(boxPost?.init?.body))).toEqual({ + expectBoxPostBody({ relayfileMountPaths: ['/integrations/github', '/workspace'], workspaceSource: { kind: 'git-overlay', @@ -449,8 +484,7 @@ describe('CloudAgentManager', () => { await manager.attach('project-1', 'cloud-agent-1') - const boxPost = mock.fetchCalls.find((call) => call.init?.method === 'POST') - expect(JSON.parse(String(boxPost?.init?.body))).toEqual({ + expectBoxPostBody({ relayfileMountPaths: ['/integrations/github'], workspaceSource: expect.objectContaining({ kind: 'git', @@ -534,4 +568,17 @@ describe('CloudAgentManager', () => { await expect(manager.attach('project-1', 'cloud-agent-1')).rejects.toThrow('broker failed to start') }) + + it('keeps mount-path PATCH bodies scoped to mount paths only', async () => { + mock.brokerManager.workspaceKeyForProject.mockResolvedValueOnce('rk_live_project') + const manager = new CloudAgentManager() + await manager.attach('project-1', 'cloud-agent-1') + mock.fetchCalls.length = 0 + + await manager.updateMountPaths('project-1', ['/integrations/slack']) + + expect(boxRequestBody('PATCH')).toEqual({ + relayfileMountPaths: ['/integrations/slack', '/workspace'] + }) + }) }) diff --git a/src/main/cloud-agent.ts b/src/main/cloud-agent.ts index 7d5f2e1..434233e 100644 --- a/src/main/cloud-agent.ts +++ b/src/main/cloud-agent.ts @@ -88,6 +88,7 @@ type CloudBrokerAdapter = { connectCloudSandbox?: (projectId: string, sandbox: CloudAgentSandbox, win?: BrowserWindow) => Promise detachCloudSandbox?: (projectId: string) => Promise onBrokerEvent?: (handler: (projectId: string, event: BrokerEvent) => void) => () => void + workspaceKeyForProject?: (projectId: string) => Promise } type CloudBrokerSystemMessageAdapter = { @@ -1077,7 +1078,18 @@ export class CloudAgentManager { ? integrationMountPaths : [SANDBOX_WORKSPACE_PATH, ...integrationMountPaths] const url = `${auth.apiUrl}/api/v1/workspaces/${encodeURIComponent(workspaceId)}/cloud-agents/${encodeURIComponent(cloudAgentId)}/box` - let sandbox = await this.fetchBox(url, auth.accessToken, 'POST', mountPaths, workspaceSource) + // #125: the sandbox broker joins the project's local relay workspace + // under a name pear knows ahead of time (cloud injects both verbatim as + // AGENT_RELAY_WORKSPACE_KEY / AGENT_RELAY_BROKER_NAME). The key is + // best-effort: without a local session the sandbox keeps creating its own + // workspace, exactly as before. + const workspaceKey = await (brokerManager as unknown as CloudBrokerAdapter) + .workspaceKeyForProject?.(projectId) + const relayBroker = { + ...(workspaceKey ? { workspaceKey } : {}), + brokerName: `cloud-${cloudAgentId.slice(0, 8)}` + } + let sandbox = await this.fetchBox(url, auth.accessToken, 'POST', mountPaths, workspaceSource, relayBroker) options.onSandbox?.(sandbox) if (options.isCancelled?.()) { await this.deleteBox(toBinding(projectId, cloudAgentId, sandbox)).catch(() => undefined) @@ -1139,12 +1151,17 @@ export class CloudAgentManager { accessToken: string, method: 'GET' | 'POST', mountPaths?: string[], - workspaceSource?: CloudAgentWorkspaceSource + workspaceSource?: CloudAgentWorkspaceSource, + relayBroker?: { workspaceKey?: string; brokerName?: string } ): Promise { + // Broker identity is provision-time only: POST carries it, PATCH/GET never + // do (cloud preserves the injected env across mount-path rewrites). const body = method === 'POST' ? JSON.stringify({ relayfileMountPaths: normalizeMountPaths(mountPaths || []), - ...(workspaceSource && workspaceSource.kind !== 'relayfile' ? { workspaceSource } : {}) + ...(workspaceSource && workspaceSource.kind !== 'relayfile' ? { workspaceSource } : {}), + ...(relayBroker?.workspaceKey ? { workspaceKey: relayBroker.workspaceKey } : {}), + ...(relayBroker?.brokerName ? { brokerName: relayBroker.brokerName } : {}) }) : undefined const requestUrl = method === 'POST' ? withAsyncWarm(url) : url From 95d31e4301abfe18b73d9937b46140a04b1d6c9e Mon Sep 17 00:00:00 2001 From: kjgbot Date: Sat, 6 Jun 2026 15:06:52 +0200 Subject: [PATCH 5/6] Surface strict-join failures and add multi-instance Stage 1 spec The broker's explicit workspace join (#125, relay 6419d59c) fails loud instead of creating a fresh workspace; classifyWorkspaceJoinFailure() keys on the contract strings so the start error status distinguishes fatal key rejection (401/403) from retryable rate limiting (429). Cast site comment updated: the drop is verified against the built 8.3.0+T3 harness-driver types and rides the published version bump. Also adds the #127 Stage 1 read-only observer design spec with the relay/cloud rulings folded in. Co-Authored-By: Claude Opus 4.8 (1M context) --- ...26-06-06-multi-instance-stage1-observer.md | 200 ++++++++++++++++++ src/main/broker.ts | 26 ++- 2 files changed, 223 insertions(+), 3 deletions(-) create mode 100644 docs/specs/2026-06-06-multi-instance-stage1-observer.md diff --git a/docs/specs/2026-06-06-multi-instance-stage1-observer.md b/docs/specs/2026-06-06-multi-instance-stage1-observer.md new file mode 100644 index 0000000..b9ea43b --- /dev/null +++ b/docs/specs/2026-06-06-multi-instance-stage1-observer.md @@ -0,0 +1,200 @@ +# Multi-Instance Stage 1: Read-Only Observer — Design Spec + +Issue: #127 (Stage 1 of 3). Prerequisites: #125 (explicit workspace join + named +broker instances — pear side landed `435d78c`, relay/cloud halves in flight), +#126 (remote host support — not started). + +Status: DESIGN ONLY. Written 2026-06-06 during the #125 build-out; contract +references below are to the locked #125 naming contract. + +## Goal + +A second Pear instance (same user, different machine — multi-user comes with +invite scoping later in this stage) can open a project that is "hosted" +elsewhere, see the same agent graph, and watch live terminal output. It cannot +send PTY input, spawn/stop agents, or mutate project settings. + +## Non-goals (Stage 1) + +- No write path of any kind from the observer (Stage 2). +- No per-user permission levels beyond owner/observer (Stage 3 adds editor). +- No presence avatars/notifications UI beyond a minimal "N instances connected" + indicator (Stage 3). +- No CRDT/merge for project definitions: single-writer (the host instance), + observers treat shared state as read-only snapshots. +- No web observer; Electron only. + +## Foundations this builds on (all #125 vintage) + +| Primitive | Where | Why it matters here | +|---|---|---| +| Explicit workspace join | relay `--workspace-key` / `AGENT_RELAY_WORKSPACE_KEY`, fail-loud on invalid key | Observer joins the project workspace; MUST hard-fail rather than silently create a fresh one (the #125 failure mode) | +| Named broker instances | `--instance-name` / `AGENT_RELAY_BROKER_NAME`; `RuntimeSpawnOptions.workspaceKey?/brokerName?` | Instance identity. Observers are addressable, and ownership checks key off the name | +| Workspace key seam | `brokerManager.workspaceKeyForProject(projectId)` (broker.ts) | The host-side source of truth an invite token wraps | +| Remote attach primitive | `attachCloudSandbox()` connects by `execUrl + apiKey` (broker.ts:1467) | The observer's connection path to the host broker is the same shape (#126 generalizes it) | +| Event dedupe discipline | `slackLogicalInjections` canonical-identity claims (integration-event-bridge.ts) | Fan-out to N instances multiplies the duplicate-delivery surface; reuse logical-identity claims, never per-copy revisions | + +## Instance naming + +Local broker names are currently `pear-${project.relayWorkspaceId}` +(project-store.ts:58) — project-stable but NOT instance-unique; two instances +on one project collide. This was explicitly deferred out of #125. Stage 1 is +where it lands: + +- Name = `pear--` where machineSlug = + hostname-derived, 8 chars, persisted in local app config on first run + (NOT regenerated per session — names must be stable for ownership checks). +- The HOST instance keeps the legacy un-suffixed name working via PEAR + METADATA, not broker-side aliasing (relay-worker ruling, 2026-06-06): the + shared project definition publishes both `brokerName` (canonical suffixed) + and `legacyBrokerName`; consumers resolve through the definition. Broker-side + aliasing would disturb the just-stabilized strict-name registration + semantics and is rejected. + +## 1. Shared project definitions + +Authoritative project definition moves to the relay workspace as a single +relayfile document: `/pear/project.json` (channels, integration scopes, roots, +host assignment, schema version). Rationale for relayfile over a new relay +cloud API: sync, change events, and conflict surface already exist; observers +already need relayfile access for mirrors. + +- Host instance: writes `/pear/project.json` on every local mutation + (debounced). Local `projects.json` stays the cache/bootstrap copy. +- Observer instance: subscribes to `/pear/project.json` change events + (the same `subscribe()` machinery the integration-event bridge uses); applies + snapshots read-only; never writes. +- Conflict rule (Stage 1): host wins, always — observers don't write, so the + only conflict is host vs stale cache, resolved by `revision` compare on open. +- Schema versioned (`schemaVersion: 1`); observer with unknown newer version + shows "upgrade required" instead of guessing. + +## 2. Invite / join flow + +Stage 1 keeps tokens same-account (multi-user scoping is the second half of +this stage, gated on relay-side scoped tokens): + +``` +InviteToken = base64url(JSON{ + v: 1, + workspaceKey, // from workspaceKeyForProject(projectId) + relayWorkspaceId, // account workspace (cloud API URL construction) + hostBrokerName, // addressing + ownership root + brokerUrl?, // #126 remote-host URL when available; absent = cloud-relay discovery + role: 'observer' +}) +``` + +- Generate: host instance, new IPC `workspace.invite(projectId)` → token string + (UI: copy button in project settings). +- Join: `workspace.join(token)` → validates schema/version → spawns/joins + observer-side broker session with `workspaceKey` + its own instance name → + fetches `/pear/project.json` → materializes a read-only project entry in the + local store (flagged `origin: 'shared'`, `role: 'observer'`). +- Fail-loud inheritance: a bad/expired key surfaces the broker's strict-join + error verbatim. No fallback to create. The broker distinguishes fatal + rejection (401/403 — "rejected") from rate-limiting (429 — "rate-limited", + HTTP status preserved through AuthHttpError): the join UI treats the former + as a bad invite and the latter as retryable with backoff. +- Token carries no bearer secret beyond the workspace key in Stage 1 + (same-account); the multi-user variant swaps `workspaceKey` for a relay-issued + scoped token and is EXPLICITLY out of scope until relay exposes one. + +## 3. Read-only enforcement + +Two layers, because UI-only enforcement is not enforcement: + +1. **Pear layer (UX):** project entries with `role: 'observer'` get + permission-aware guards in the renderer stores — spawn/stop/input/settings + actions disabled with tooltips. IPC handlers for mutating calls check the + role flag and reject (`ROLE_OBSERVER_READONLY` error) so a buggy renderer + can't mutate either. +2. **Broker layer (authority):** per relay-worker (2026-06-06) the right home + is a readonly capability on the CONNECTION/API layer — a flag in the local + HTTP/WS connection/session context that rejects mutating REST endpoints and + delivery/spawn/release/write actions, while the host connection keeps + normal identity. The lease API is explicitly the wrong layer (leases govern + broker lifetime, not caller authority). Effort: moderate; scheduled with + relay, not in Stage 1's critical path. Stage 1 ships with pear-layer + enforcement only; the trust boundary is then "same account", acceptable for + same-user Stage 1, NOT for multi-user invites (hard gate: multi-user waits + for the broker-side capability). + +## 4. PTY fan-out + +The broker already supports multiple clients; #125 makes both instances land in +one workspace. Stage 1 needs verification + hardening, not new plumbing: + +- Test matrix: 2 instances × (local host, remote host) × (agent spawn before / + after observer join) — observer must receive output chunks for agents + spawned both before and after it connected. Catch-up contract per + relay-worker (2026-06-06): current visible-screen PTY snapshot (existing + snapshot/state machinery) + live stream from join. There is NO durable + per-observer scrollback contract — historical replay is out of scope and the + UI labels the point where the observer's view begins. +- Duplicate-event hardening per AGENTS.md guidance: PTY chunks are + sequence-numbered per (agentName, ptyId); pear-side consumer drops + already-seen sequence numbers — same canonical-identity discipline as the + slack dedupe, trivially cheaper (monotonic seq, not content hashes). +- Broker events (`agent_spawned`, `agent_exited`, …) fan out to all instances; + observer applies them to its read-only graph. Event `instanceName` field + (from the #125 named-instance work) distinguishes "who did that" for the + Stage 3 presence layer — carried but unused in Stage 1. + +## 5. Minimal presence (Stage 1 slice) + +- Each instance publishes `{instanceName, role, joinedAt}` on a relaycast + channel `#pear-presence-` on join, tombstone on clean exit, + TTL-expired by peers on silence (heartbeat every 60s). +- UI: "2 instances connected" pill on the project header. Nothing else. +- This channel is also Stage 2's coordination root (ownership claims), so the + message schema gets a `kind` discriminator from day one. + +## IPC / type additions + +- `workspace.invite(projectId) → string` +- `workspace.join(token) → { projectId }` +- `workspace.onPresenceUpdate(projectId, instances[])` +- Project record: `origin: 'local' | 'shared'`, `role: 'owner' | 'observer'`, + `hostBrokerName?`, `sharedRevision?` +- Mutating IPC handlers gain the role guard (single helper, applied at the + handler boundary, not per-store). + +## Dependencies / sequencing + +``` +#125 relay strict-join + named instances [in flight, T3] +#125 cloud verbatim env injection [in flight, T4] +#126 remote host (broker URL for observer) [not started — Stage 1 can demo + against a cloud-sandbox host first] +relay: scoped invite tokens [needed for multi-user only] +relay: broker-side readonly capability [needed for multi-user; stub OK same-user] +``` + +Buildable order inside Stage 1: instance-name uniqueness → shared +project.json (host write path, observer read path) → invite/join IPC + UI → +PTY fan-out verification → presence slice. Each lands behind a +`PEAR_MULTI_INSTANCE` flag until the stage is coherent. + +## Open questions (for #general before implementation) + +1. relay-worker: name alias for the legacy un-suffixed host broker name vs + pear publishing both names — which is cheaper broker-side? +2. relay-worker: per-connection readonly capability — connection API or + lease API? Effort estimate decides whether same-user Stage 1 waits for it. +3. cloud-lead: does `/pear/project.json` in the account workspace collide with + any cloud-side relayfile path conventions/reserved prefixes? +4. PTY backfill: does the broker keep enough scrollback per PTY to replay on + attach, or is "live from join" the Stage 1 contract? + +## Test plan sketch + +- Unit: invite token round-trip (incl. version/role rejection), role guard on + every mutating IPC handler (table-driven), project.json snapshot apply + + revision conflict. +- Integration: two BrokerManager instances against one broker (the + broker.test.ts harness already mocks spawn; extend with a second client), + PTY seq dedupe under interleaved chunks, observer join while agent mid-run. +- Manual/e2e (needs T2-style debug logging): second machine joins via token, + watches a live agent, kill -9 the host instance → observer survives in + read-only state with stale-host indicator. diff --git a/src/main/broker.ts b/src/main/broker.ts index 71db34a..ed64f94 100644 --- a/src/main/broker.ts +++ b/src/main/broker.ts @@ -139,6 +139,18 @@ export function resolveAgentRelayMcpCommand(): string | undefined { }) } +// Strict-join failures from the broker (#125): an explicitly pinned workspace +// key never falls back to creating a fresh workspace. Auth rejection (401/403, +// "was rejected") is fatal — the key is bad or revoked; rate limiting (429, +// "was rate-limited") is retryable. Contract strings from agent-relay-broker +// relaycast/auth.rs, verified in the T3 review. +export function classifyWorkspaceJoinFailure(err: unknown): 'rejected' | 'rate-limited' | null { + const message = toErrorMessage(err) + if (/explicit workspace key .* was rate-limited/iu.test(message)) return 'rate-limited' + if (/explicit workspace key .* was rejected/iu.test(message)) return 'rejected' + return null +} + function parseAgentWorkforceJson(output: string, label: string): T { try { return JSON.parse(output) as T @@ -1306,8 +1318,10 @@ export class BrokerManager { // Phase 1 of #125: the local broker stays the workspace creator, so the // key is only threaded when explicitly pinned via env. The intersection - // type is the single cast site until @agent-relay/harness-driver ships - // workspaceKey in RuntimeSpawnOptions (T3) — drop it once published. + // type is the single cast site until @agent-relay/harness-driver + // PUBLISHES workspaceKey in RuntimeSpawnOptions (landed relay-side in + // 6419d59c; verified against the built 8.3.0+T3 dist locally) — the + // intersection erases to a no-op then and drops with the version bump. const explicitWorkspaceKey = process.env.AGENT_RELAY_WORKSPACE_KEY?.trim() || undefined const opts: AgentRelaySpawnOptions & { workspaceKey?: string } = { cwd, @@ -1362,7 +1376,13 @@ export class BrokerManager { return await startPromise } catch (err) { console.error(`[broker] Failed to start for project ${normalizedProjectId}:`, err) - this.sendStatusToWindow(win, normalizedProjectId, 'error', String(err)) + const joinFailure = classifyWorkspaceJoinFailure(err) + const statusMessage = joinFailure === 'rate-limited' + ? `Workspace join rate-limited (retryable): ${String(err)}` + : joinFailure === 'rejected' + ? `Workspace key rejected — broker refused to create a fresh workspace: ${String(err)}` + : String(err) + this.sendStatusToWindow(win, normalizedProjectId, 'error', statusMessage) throw err } finally { if (this.startPromises.get(normalizedProjectId) === startPromise) { From 8350eb0b361e82696a299407585b1daf25aec46a Mon Sep 17 00:00:00 2001 From: kjgbot Date: Sat, 6 Jun 2026 15:16:02 +0200 Subject: [PATCH 6/6] Detect cloud sandbox brokers that ignore the workspace key MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A sandbox broker predating the strict-join contract (#125) silently ignores the injected AGENT_RELAY_WORKSPACE_KEY and creates an isolated workspace — the exact failure the contract fixed, reborn through a stale snapshot bake. The warm path now records the workspace key it actually sent per project and attachCloudSandbox compares it against the sandbox session's workspace_key: a mismatch logs loudly and publishes a cloud_workspace_key_mismatch broker event carrying 8-char prefixes of both keys, so a stale-binary deployment is attributable from logs alone. Keyless and legacy warms stay silent, and detach clears the per-project key so re-attaches cannot compare against a stale value. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/main/broker.test.ts | 73 ++++++++++++++++++++++++++++++++++++ src/main/broker.ts | 39 ++++++++++++++++++- src/main/cloud-agent.test.ts | 56 +++++++++++++++++++++++++++ src/main/cloud-agent.ts | 18 ++++++++- 4 files changed, 184 insertions(+), 2 deletions(-) diff --git a/src/main/broker.test.ts b/src/main/broker.test.ts index 13c3763..3e4a75a 100644 --- a/src/main/broker.test.ts +++ b/src/main/broker.test.ts @@ -74,6 +74,7 @@ const mock = vi.hoisted(() => { connectedClients: [] as MockClient[], nextLocalAgents: [] as string[], nextCloudAgents: [] as string[], + nextCloudSessionMetadata: [] as Array>, nextConnectedAgents: [] as string[], nextConnectedSessionErrors: [] as Error[] } @@ -97,6 +98,10 @@ const mock = vi.hoisted(() => { constructor() { const client = createMockClient(state.nextCloudAgents.splice(0)) + const metadata = state.nextCloudSessionMetadata.shift() + if (metadata) { + client.getSession.mockResolvedValueOnce(metadata) + } state.constructedClients.push(client) // Re-key `this` as the mock client. return client as unknown as HarnessDriverClient @@ -334,6 +339,7 @@ describe('BrokerManager local + cloud coexistence', () => { mock.state.connectedClients.length = 0 mock.state.nextLocalAgents = [] mock.state.nextCloudAgents = [] + mock.state.nextCloudSessionMetadata = [] mock.state.nextConnectedAgents = [] mock.state.nextConnectedSessionErrors = [] mock.HarnessDriverClient.spawn.mockClear() @@ -417,6 +423,73 @@ describe('BrokerManager local + cloud coexistence', () => { await manager.shutdown() }) + it('emits a cloud workspace mismatch event when the sandbox ignores the sent key', async () => { + const manager = new BrokerManager() + const win = createMockWindow() + mock.state.nextCloudSessionMetadata.push({ workspace_key: 'rk_sand_456' }) + + await manager.attachCloudSandbox(PROJECT_ID, { + sandboxId: 'sandbox-1', + execUrl: 'https://sandbox.example', + sentWorkspaceKey: 'rk_sent_123' + }, win) + + expect(win.webContents.send).toHaveBeenCalledWith( + 'broker:event', + expect.objectContaining({ + kind: 'cloud_workspace_key_mismatch', + projectId: PROJECT_ID, + cloudSandboxId: 'sandbox-1', + sentWorkspaceKeyPrefix: 'rk_sent_', + sandboxWorkspaceKeyPrefix: 'rk_sand_', + detail: expect.stringContaining('stale broker binary') + }) + ) + + await manager.shutdown() + }) + + it('does not emit a cloud workspace mismatch event when the sandbox joins the sent key', async () => { + const manager = new BrokerManager() + const win = createMockWindow() + mock.state.nextCloudSessionMetadata.push({ workspace_key: 'rk_live_same' }) + + await manager.attachCloudSandbox(PROJECT_ID, { + sandboxId: 'sandbox-1', + execUrl: 'https://sandbox.example', + sentWorkspaceKey: 'rk_live_same' + }, win) + + const mismatchEvents = (win.webContents.send as ReturnType).mock.calls + .filter(([channel, payload]) => + channel === 'broker:event' && + (payload as { kind?: string }).kind === 'cloud_workspace_key_mismatch' + ) + expect(mismatchEvents).toHaveLength(0) + + await manager.shutdown() + }) + + it('does not emit a cloud workspace mismatch event on keyless legacy attaches', async () => { + const manager = new BrokerManager() + const win = createMockWindow() + mock.state.nextCloudSessionMetadata.push({ workspace_key: 'rk_live_sandbox' }) + + await manager.attachCloudSandbox(PROJECT_ID, { + sandboxId: 'sandbox-1', + execUrl: 'https://sandbox.example' + }, win) + + const mismatchEvents = (win.webContents.send as ReturnType).mock.calls + .filter(([channel, payload]) => + channel === 'broker:event' && + (payload as { kind?: string }).kind === 'cloud_workspace_key_mismatch' + ) + expect(mismatchEvents).toHaveLength(0) + + await manager.shutdown() + }) + it('reuses current harness-driver connection files instead of spawning another broker', async () => { const tempDir = await mkdtemp(join(tmpdir(), 'pear-current-connection-')) const connectionPath = join(tempDir, '.agentworkforce', 'relay', 'connection.json') diff --git a/src/main/broker.ts b/src/main/broker.ts index ed64f94..6eb3e7d 100644 --- a/src/main/broker.ts +++ b/src/main/broker.ts @@ -313,6 +313,13 @@ export interface CloudAgentSandboxHandle { execUrl: string apiKey?: string relayfileMountPath?: string + /** + * The relay workspace key provisioning actually sent on POST /box (#125). + * Set only when the warm path resolved a local key — its presence arms the + * attach-time tripwire that detects a sandbox broker silently ignoring + * AGENT_RELAY_WORKSPACE_KEY (stale, pre-strict-join binary). + */ + sentWorkspaceKey?: string } type BrokerEventObserver = (projectId: string, event: BrokerEvent) => void @@ -1562,7 +1569,28 @@ export class BrokerManager { baseUrl: execUrl, ...(apiKey ? { apiKey } : {}) }) - await client.getSession() + const sessionMetadata = await client.getSession() + + // #125 tripwire: provisioning asked this sandbox broker to JOIN an + // explicit workspace. A different key in the session means the broker + // ignored AGENT_RELAY_WORKSPACE_KEY (a pre-strict-join binary, e.g. a + // stale snapshot bake) and silently created an isolated workspace — + // exactly the failure #125 fixed. Compare only when a key was actually + // sent; prefixes keep the event diagnosable from logs without leaking + // whole keys. + const sentWorkspaceKey = handle.sentWorkspaceKey?.trim() || undefined + const sandboxWorkspaceKey = sessionMetadata.workspace_key || undefined + const workspaceKeyMismatch = !!sentWorkspaceKey && sandboxWorkspaceKey !== sentWorkspaceKey + if (workspaceKeyMismatch) { + console.error( + `[broker] Cloud sandbox broker ignored workspace key for project ${normalizedProjectId} — stale broker binary?`, + { + sandboxId, + sentWorkspaceKeyPrefix: sentWorkspaceKey?.slice(0, 8), + sandboxWorkspaceKeyPrefix: sandboxWorkspaceKey?.slice(0, 8) ?? '(none)' + } + ) + } const eventStreamGeneration = this.nextEventStreamGeneration() const unsubEvent = this.attachClient(sessionKey, client, win, eventStreamGeneration) @@ -1591,6 +1619,15 @@ export class BrokerManager { }) client.connectEvents() + if (workspaceKeyMismatch) { + this.publishBrokerEvent(sessionKey, normalizedProjectId, win, { + kind: 'cloud_workspace_key_mismatch', + cloudSandboxId: sandboxId, + sentWorkspaceKeyPrefix: sentWorkspaceKey?.slice(0, 8) ?? null, + sandboxWorkspaceKeyPrefix: sandboxWorkspaceKey?.slice(0, 8) ?? null, + detail: 'sandbox broker ignored AGENT_RELAY_WORKSPACE_KEY — stale broker binary?' + }) + } this.publishBrokerEvent(sessionKey, normalizedProjectId, win, { kind: 'broker_initialized', name: `cloud-${normalizedProjectId}`, diff --git a/src/main/cloud-agent.test.ts b/src/main/cloud-agent.test.ts index ababb61..fe52e63 100644 --- a/src/main/cloud-agent.test.ts +++ b/src/main/cloud-agent.test.ts @@ -345,6 +345,62 @@ describe('CloudAgentManager', () => { relayfileMountPaths: ['/integrations/github', '/workspace'], workspaceKey: 'rk_live_project' }) + expect(mock.brokerManager.attachCloudSandbox).toHaveBeenCalledWith( + 'project-1', + expect.objectContaining({ + sandboxId: 'sandbox-1', + sentWorkspaceKey: 'rk_live_project' + }), + undefined + ) + }) + + it('does not pass a sent workspace key for keyless warms', async () => { + const manager = new CloudAgentManager() + + await manager.attach('project-1', 'cloud-agent-1') + + expect(mock.brokerManager.attachCloudSandbox).toHaveBeenCalledWith( + 'project-1', + expect.not.objectContaining({ + sentWorkspaceKey: expect.anything() + }), + undefined + ) + }) + + it('clears the sent workspace key when a cloud agent detaches', async () => { + mock.brokerManager.workspaceKeyForProject.mockResolvedValueOnce('rk_live_project') + const manager = new CloudAgentManager() + await manager.attach('project-1', 'cloud-agent-1') + await manager.detach('project-1') + mock.brokerManager.attachCloudSandbox.mockClear() + + await (manager as unknown as { + connectBroker: (projectId: string, sandbox: { + sandboxId: string + execUrl: string + filesUrl: string + relayfileToken: string + relayfileMountPath: string + status: 'ready' + }) => Promise + }).connectBroker('project-1', { + sandboxId: 'sandbox-2', + execUrl: 'https://sandbox-2.example', + filesUrl: 'https://sandbox-2.example/files', + relayfileToken: 'relayfile-token-2', + relayfileMountPath: '/remote/project-1', + status: 'ready' + }) + + expect(mock.brokerManager.attachCloudSandbox).toHaveBeenCalledWith( + 'project-1', + expect.not.objectContaining({ + sentWorkspaceKey: expect.anything() + }), + undefined + ) }) it('reuses a warm-on-intent box when attach is clicked', async () => { diff --git a/src/main/cloud-agent.ts b/src/main/cloud-agent.ts index 434233e..2f711b7 100644 --- a/src/main/cloud-agent.ts +++ b/src/main/cloud-agent.ts @@ -470,6 +470,10 @@ export class CloudAgentManager { private appliedConflictPolicies = new Map() private mountRestartPromises = new Map>() private workspaceSources = new Map() + // Relay workspace keys actually sent on POST /box, per project — arms the + // attach-time stale-broker tripwire (#125). Tracked here because the + // sandbox object is replaced by warm-poll GETs between warm and attach. + private sentWorkspaceKeys = new Map() private prewarms = new Map() private canceledAttaches = new Set() private eventHandlers = new Set<(event: CloudAgentEvent) => void>() @@ -659,6 +663,7 @@ export class CloudAgentManager { this.lastSettledAt.delete(normalizedProjectId) this.appliedConflictPolicies.delete(normalizedProjectId) this.workspaceSources.delete(normalizedProjectId) + this.sentWorkspaceKeys.delete(normalizedProjectId) this.persistCloudAgent(normalizedProjectId, null) this.emit({ type: 'mount-status', projectId: normalizedProjectId, mount: toMountStatus(null) }) } @@ -1085,6 +1090,11 @@ export class CloudAgentManager { // workspace, exactly as before. const workspaceKey = await (brokerManager as unknown as CloudBrokerAdapter) .workspaceKeyForProject?.(projectId) + if (workspaceKey) { + this.sentWorkspaceKeys.set(projectId, workspaceKey) + } else { + this.sentWorkspaceKeys.delete(projectId) + } const relayBroker = { ...(workspaceKey ? { workspaceKey } : {}), brokerName: `cloud-${cloudAgentId.slice(0, 8)}` @@ -1281,7 +1291,13 @@ export class CloudAgentManager { const broker = brokerManager as unknown as CloudBrokerAdapter const attach = broker.attachCloudSandbox || broker.connectCloudSandbox if (attach) { - await attach.call(brokerManager, projectId, sandbox, win) + const sentWorkspaceKey = this.sentWorkspaceKeys.get(projectId) + await attach.call( + brokerManager, + projectId, + sentWorkspaceKey ? { ...sandbox, sentWorkspaceKey } : sandbox, + win + ) return }