diff --git a/src/main/__tests__/integration-event-bridge.test.ts b/src/main/__tests__/integration-event-bridge.test.ts index 04bbfacd..1f0b1bf0 100644 --- a/src/main/__tests__/integration-event-bridge.test.ts +++ b/src/main/__tests__/integration-event-bridge.test.ts @@ -1,5 +1,5 @@ import assert from 'node:assert/strict' -import { mkdir, mkdtemp, rm, writeFile } from 'node:fs/promises' +import { mkdir, mkdtemp, rm, stat, writeFile } from 'node:fs/promises' import { tmpdir } from 'node:os' import { join } from 'node:path' import { beforeEach, test } from 'node:test' @@ -15,7 +15,8 @@ import { localWatchEventPathsForFilename, localWatchRootsFor, relayfileSdkPathFiltersFor, - resetIntegrationEventTelemetryForTests + resetIntegrationEventTelemetryForTests, + subscriptionSpecsFor } from '../integration-event-bridge.ts' import type { ConnectedIntegration } from '../integrations.ts' @@ -150,6 +151,16 @@ async function waitUntil(predicate: () => boolean, timeoutMs = 2_000): Promise { + const deadline = Date.now() + timeoutMs + while (Date.now() <= deadline) { + const stats = await stat(path).catch(() => null) + if (!stats) return + await new Promise((resolve) => setTimeout(resolve, 10)) + } + assert.equal(await stat(path).then(() => true).catch(() => false), false) +} + function makeHarness( agents = ['alice', 'bob'], options: { @@ -1416,6 +1427,64 @@ test('slack context falls back to expanded event data when targeted remote previ assert.equal((harness.sent[0].input.data?.contextPreview as { content?: string } | undefined)?.content, undefined) }) +test('slack DM context uses materialized local file when targeted remote preview is missing', async () => { + const workspaceRoot = await mkdtemp(join(tmpdir(), 'pear-event-preview-')) + const localRoot = join(workspaceRoot, 'workspace-id', 'slack', 'users', 'U0ADJH4P83T', 'messages') + const messagePath = '/slack/users/U0ADJH4P83T/messages/1780905125_300069/meta.json' + await mkdir(join(localRoot, '1780905125_300069'), { recursive: true }) + await writeFile( + join(localRoot, '1780905125_300069', 'meta.json'), + JSON.stringify({ + provider: 'slack', + text: 'local Slack DM context', + user: 'U123', + dm_user_id: 'U0ADJH4P83T' + }) + ) + + try { + const harness = makeHarness(['alice'], { failReadFile: true }) + + await withMockedNow('2026-06-05T14:00:00.000Z', async () => { + await harness.bridge.reconcile('project-1', [ + integration({ + provider: 'slack', + integrationId: 'slack-1', + mountPaths: ['/slack/users/U0ADJH4P83T/messages'], + localMountPaths: [localRoot], + downloadHistoricalData: false, + scope: { listenDms: true, notifyAgents: ['alice'] } + }) + ]) + }) + + await harness.emit({ + ...changeEvent(messagePath, 'slack'), + expand: async () => ({ + level: 'full', + path: messagePath, + data: { + path: messagePath, + deleted: false + } + }) + } as ChangeEvent) + await waitForSent(harness, 1, 2_500) + + assert.match(harness.sent[0].input.text, /Slack message event/u) + assert.match(harness.sent[0].input.text, /Location: User U0ADJH4P83T/u) + assert.match(harness.sent[0].input.text, /Author: U123/u) + assert.match(harness.sent[0].input.text, /Message:\nlocal Slack DM context/u) + assert.doesNotMatch(harness.sent[0].input.text, /Message: unavailable/u) + assert.equal( + (harness.sent[0].input.data?.contextPreview as { path?: string } | undefined)?.path, + messagePath + ) + } finally { + await rm(workspaceRoot, { recursive: true, force: true }) + } +}) + test('slack context retries targeted remote preview before falling back to sparse event data', async () => { const harness = makeHarness(['alice'], { readFileFailuresBeforeSuccess: 1 }) const messagePath = '/slack/channels/D123ABC/messages/1780668000_000000/meta.json' @@ -1830,6 +1899,30 @@ test('local fallback watchers use the shared Slack users command-root grammar', ) }) +test('subscription specs include concrete local roots for Slack user message mounts', () => { + const localRoot = join('/tmp', 'relayfile', 'workspaces', 'workspace-id', 'slack', 'users', 'U123', 'messages') + const specs = subscriptionSpecsFor( + [ + integration({ + provider: 'slack', + integrationId: 'slack-1', + mountPaths: ['/slack/users/U123/messages'], + localMountPaths: [localRoot], + scope: { listenDms: true } + }) + ], + 'workspace-id' + ) + + assert.equal(specs.length, 1) + assert.deepEqual(specs[0].localMountRoots, [ + { + localRoot, + remoteRoot: '/slack/users/U123/messages' + } + ]) +}) + test('local watcher path construction does not duplicate remote path segments', () => { const messageLocalRoot = join('/tmp', 'relayfile', 'workspaces', 'workspace-id', 'slack', 'channels', 'C0AD7UU0J1G', 'messages', '1780019742_971719') const messageRemoteRoot = '/slack/channels/C0AD7UU0J1G/messages/1780019742_971719' @@ -2021,6 +2114,68 @@ test('integration events ignore index, discovery, tmp, dotfile, and local writeb assert.deepEqual(harness.listAgentsCalls, []) }) +test('confirmed Slack writeback success removes the local draft command file', async () => { + const workspaceRoot = await mkdtemp(join(tmpdir(), 'pear-writeback-cleanup-')) + const localRoot = join(workspaceRoot, 'workspace-id', 'slack', 'channels', 'C123ABC', 'messages') + const localDraftPath = join(localRoot, 'reply-confirmed.json') + const remoteDraftPath = '/slack/channels/C123ABC/messages/reply-confirmed.json' + await mkdir(localRoot, { recursive: true }) + await writeFile(localDraftPath, JSON.stringify({ text: 'confirmed send' })) + + try { + const harness = makeHarness(['alice']) + await harness.bridge.reconcile('project-1', [ + integration({ + provider: 'slack', + integrationId: 'slack-1', + mountPaths: ['/slack/channels/C123ABC/messages'], + localMountPaths: [localRoot], + scope: { notifyAgents: ['alice'] } + }) + ]) + + await harness.emit({ + ...changeEvent(remoteDraftPath, 'slack'), + type: 'writeback.succeeded' + } as ChangeEvent) + await waitForPathMissing(localDraftPath) + + assert.deepEqual(harness.sent, []) + } finally { + await rm(workspaceRoot, { recursive: true, force: true }) + } +}) + +test('Slack writeback draft cleanup waits for confirmed dispatch', async () => { + const workspaceRoot = await mkdtemp(join(tmpdir(), 'pear-writeback-cleanup-')) + const localRoot = join(workspaceRoot, 'workspace-id', 'slack', 'channels', 'C123ABC', 'messages') + const localDraftPath = join(localRoot, 'reply-pending.json') + const remoteDraftPath = '/slack/channels/C123ABC/messages/reply-pending.json' + await mkdir(localRoot, { recursive: true }) + await writeFile(localDraftPath, JSON.stringify({ text: 'pending send' })) + + try { + const harness = makeHarness(['alice']) + await harness.bridge.reconcile('project-1', [ + integration({ + provider: 'slack', + integrationId: 'slack-1', + mountPaths: ['/slack/channels/C123ABC/messages'], + localMountPaths: [localRoot], + scope: { notifyAgents: ['alice'] } + }) + ]) + + await harness.emit(changeEvent(remoteDraftPath, 'slack')) + await waitForDispatcherTick() + + assert.equal(await stat(localDraftPath).then(() => true).catch(() => false), true) + assert.deepEqual(harness.sent, []) + } finally { + await rm(workspaceRoot, { recursive: true, force: true }) + } +}) + test('integration events notify nested non-numeric Slack message records', async () => { const harness = makeHarness() diff --git a/src/main/integration-event-bridge.ts b/src/main/integration-event-bridge.ts index 3ab23da2..3ed57f37 100644 --- a/src/main/integration-event-bridge.ts +++ b/src/main/integration-event-bridge.ts @@ -1,6 +1,6 @@ import { createHash } from 'node:crypto' import { existsSync, watch, type FSWatcher } from 'node:fs' -import { appendFile, mkdir, readFile, stat } from 'node:fs/promises' +import { appendFile, mkdir, readFile, rm, stat } from 'node:fs/promises' import { homedir } from 'node:os' import { dirname, isAbsolute, join, relative, resolve, sep } from 'node:path' import { @@ -568,7 +568,7 @@ function targetLabels(targets: DeliveryTargets): string[] { return [...targets.agents.map((agent) => `@${agent}`), ...targets.channels] } -function subscriptionSpecsFor( +export function subscriptionSpecsFor( integrations: ConnectedIntegration[], localMountWorkspaceId?: string ): SubscriptionSpec[] { @@ -1869,6 +1869,36 @@ async function readLocalEventContextPreview( return undefined } +async function cleanupConfirmedSlackWritebackDraft( + projectId: string, + event: ChangeEvent, + specs: SubscriptionSpec[] +): Promise { + if (event.type !== 'writeback.succeeded') return + const remotePath = eventSummaryValue(event.resource.path) + if (!remotePath || !isLikelyLocalWritebackCommandPath(remotePath)) return + + for (const spec of specs) { + if (spec.provider !== 'slack') continue + for (const root of spec.localMountRoots) { + if (!isSlackWritebackCommandRoot(root.remoteRoot)) continue + if (!pathIsInsideMount(remotePath, root.remoteRoot)) continue + const localPath = localPathForRemotePathInsideRoot(root.localRoot, root.remoteRoot, remotePath) + if (!localPathIsInsideRoot(root.localRoot, localPath)) continue + const stats = await stat(localPath).catch(() => null) + if (!stats || stats.isDirectory()) continue + await rm(localPath, { force: true }) + logIntegrationEvent('confirmed Slack writeback draft cleaned', { + projectId, + eventId: event.id, + remotePath, + localRoot: root.localRoot + }) + return + } + } +} + function slackScopeLabel(path: string): string | undefined { const segments = pathSegments(path) const channelIndex = segments.indexOf('channels') @@ -2331,6 +2361,18 @@ export class IntegrationEventBridge { type: event.type, path: event.resource.path }) + void cleanupConfirmedSlackWritebackDraft(projectId, event, specs).catch((error) => { + warnIntegrationEventAggregated( + `confirmed writeback cleanup failed:${projectId}`, + 'confirmed writeback cleanup failed', + { + projectId, + eventId: event.id, + path: event.resource.path, + error: toErrorMessage(error) + } + ) + }) void this.enqueueEvent(projectId, event, specs, { source: 'remote', subscriptionStartedAtMs: remoteSubscriptionStartedAtMs, diff --git a/src/main/integrations.test.ts b/src/main/integrations.test.ts index 73fb8e25..6e7f421d 100644 --- a/src/main/integrations.test.ts +++ b/src/main/integrations.test.ts @@ -757,6 +757,38 @@ describe('IntegrationsManager', () => { ) }) + it('passes current local mount paths into integration event reconciliation', async () => { + mock.store.projects[0].integrations[0] = { + ...mock.store.projects[0].integrations[0], + mountPaths: ['/slack/users/U0ADJH4P83T/messages'], + scope: { listenDms: true }, + subscribeAgent: true + } + mock.integrationMountManager.currentWorkspaceId.mockReturnValue('workspace-id') + mock.integrationMountManager.localPathsFor.mockReturnValue([ + '/tmp/relayfile/workspace-id/slack/users/U0ADJH4P83T/messages' + ]) + const manager = new IntegrationsManager() + + await manager.refreshAgentState('project-1') + + expect(mock.integrationEventBridge.reconcile).toHaveBeenCalledWith( + 'project-1', + [ + expect.objectContaining({ + provider: 'slack', + integrationId: 'slack-integration-1', + mountPaths: ['/slack/users/U0ADJH4P83T/messages'], + localMountPaths: ['/tmp/relayfile/workspace-id/slack/users/U0ADJH4P83T/messages'] + }) + ] + ) + expect(mock.integrationMountManager.localPathsFor).toHaveBeenCalledWith('workspace-id', { + provider: 'slack', + mountPaths: ['/discovery/slack', '/slack/users/U0ADJH4P83T/messages'] + }) + }) + it('retries active integration event subscriptions after startup mount hydration', async () => { mock.store.projects[0].integrations[0].subscribeAgent = true const manager = new IntegrationsManager() diff --git a/src/main/integrations.ts b/src/main/integrations.ts index 0c3fa835..ee2778c0 100644 --- a/src/main/integrations.ts +++ b/src/main/integrations.ts @@ -2028,7 +2028,7 @@ export class IntegrationsManager { await integrationEventBridge.closeAllExcept(projectId) await integrationEventBridge.reconcile( projectId, - this.visibleIntegrationsForProject(projectId) + this.withCurrentLocalMountPaths(this.visibleIntegrationsForProject(projectId)) ) return true } catch (error) { @@ -2155,10 +2155,7 @@ export class IntegrationsManager { })) } - private async withLocalMountPaths(integrations: ConnectedIntegration[]): Promise { - await this.syncLocalMounts({ hydrateCloud: false }).catch((error) => { - if (!isIntegrationAuthRecoveryError(error)) throw error - }) + private withCurrentLocalMountPaths(integrations: ConnectedIntegration[]): ConnectedIntegration[] { const workspaceId = integrationMountManager.currentWorkspaceId() if (!workspaceId) return integrations return integrations.map((integration) => { @@ -2172,6 +2169,13 @@ export class IntegrationsManager { }) } + private async withLocalMountPaths(integrations: ConnectedIntegration[]): Promise { + await this.syncLocalMounts({ hydrateCloud: false }).catch((error) => { + if (!isIntegrationAuthRecoveryError(error)) throw error + }) + return this.withCurrentLocalMountPaths(integrations) + } + private async resolveIntegrationMountPath(projectId: string, integrationId: string, targetPath: string): Promise { const resolvedPath = resolve(targetPath) const localIntegrations = await this.withLocalMountPaths(this.listConnected(projectId))