Skip to content

Commit d719ade

Browse files
authored
Fix integration event local previews (#159)
1 parent ee1e6c8 commit d719ade

4 files changed

Lines changed: 242 additions & 9 deletions

File tree

src/main/__tests__/integration-event-bridge.test.ts

Lines changed: 157 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import assert from 'node:assert/strict'
2-
import { mkdir, mkdtemp, rm, writeFile } from 'node:fs/promises'
2+
import { mkdir, mkdtemp, rm, stat, writeFile } from 'node:fs/promises'
33
import { tmpdir } from 'node:os'
44
import { join } from 'node:path'
55
import { beforeEach, test } from 'node:test'
@@ -15,7 +15,8 @@ import {
1515
localWatchEventPathsForFilename,
1616
localWatchRootsFor,
1717
relayfileSdkPathFiltersFor,
18-
resetIntegrationEventTelemetryForTests
18+
resetIntegrationEventTelemetryForTests,
19+
subscriptionSpecsFor
1920
} from '../integration-event-bridge.ts'
2021
import type { ConnectedIntegration } from '../integrations.ts'
2122

@@ -150,6 +151,16 @@ async function waitUntil(predicate: () => boolean, timeoutMs = 2_000): Promise<v
150151
assert.equal(predicate(), true)
151152
}
152153

154+
async function waitForPathMissing(path: string, timeoutMs = 2_000): Promise<void> {
155+
const deadline = Date.now() + timeoutMs
156+
while (Date.now() <= deadline) {
157+
const stats = await stat(path).catch(() => null)
158+
if (!stats) return
159+
await new Promise((resolve) => setTimeout(resolve, 10))
160+
}
161+
assert.equal(await stat(path).then(() => true).catch(() => false), false)
162+
}
163+
153164
function makeHarness(
154165
agents = ['alice', 'bob'],
155166
options: {
@@ -1416,6 +1427,64 @@ test('slack context falls back to expanded event data when targeted remote previ
14161427
assert.equal((harness.sent[0].input.data?.contextPreview as { content?: string } | undefined)?.content, undefined)
14171428
})
14181429

1430+
test('slack DM context uses materialized local file when targeted remote preview is missing', async () => {
1431+
const workspaceRoot = await mkdtemp(join(tmpdir(), 'pear-event-preview-'))
1432+
const localRoot = join(workspaceRoot, 'workspace-id', 'slack', 'users', 'U0ADJH4P83T', 'messages')
1433+
const messagePath = '/slack/users/U0ADJH4P83T/messages/1780905125_300069/meta.json'
1434+
await mkdir(join(localRoot, '1780905125_300069'), { recursive: true })
1435+
await writeFile(
1436+
join(localRoot, '1780905125_300069', 'meta.json'),
1437+
JSON.stringify({
1438+
provider: 'slack',
1439+
text: 'local Slack DM context',
1440+
user: 'U123',
1441+
dm_user_id: 'U0ADJH4P83T'
1442+
})
1443+
)
1444+
1445+
try {
1446+
const harness = makeHarness(['alice'], { failReadFile: true })
1447+
1448+
await withMockedNow('2026-06-05T14:00:00.000Z', async () => {
1449+
await harness.bridge.reconcile('project-1', [
1450+
integration({
1451+
provider: 'slack',
1452+
integrationId: 'slack-1',
1453+
mountPaths: ['/slack/users/U0ADJH4P83T/messages'],
1454+
localMountPaths: [localRoot],
1455+
downloadHistoricalData: false,
1456+
scope: { listenDms: true, notifyAgents: ['alice'] }
1457+
})
1458+
])
1459+
})
1460+
1461+
await harness.emit({
1462+
...changeEvent(messagePath, 'slack'),
1463+
expand: async () => ({
1464+
level: 'full',
1465+
path: messagePath,
1466+
data: {
1467+
path: messagePath,
1468+
deleted: false
1469+
}
1470+
})
1471+
} as ChangeEvent)
1472+
await waitForSent(harness, 1, 2_500)
1473+
1474+
assert.match(harness.sent[0].input.text, /Slack message event/u)
1475+
assert.match(harness.sent[0].input.text, /Location: User U0ADJH4P83T/u)
1476+
assert.match(harness.sent[0].input.text, /Author: U123/u)
1477+
assert.match(harness.sent[0].input.text, /Message:\nlocal Slack DM context/u)
1478+
assert.doesNotMatch(harness.sent[0].input.text, /Message: unavailable/u)
1479+
assert.equal(
1480+
(harness.sent[0].input.data?.contextPreview as { path?: string } | undefined)?.path,
1481+
messagePath
1482+
)
1483+
} finally {
1484+
await rm(workspaceRoot, { recursive: true, force: true })
1485+
}
1486+
})
1487+
14191488
test('slack context retries targeted remote preview before falling back to sparse event data', async () => {
14201489
const harness = makeHarness(['alice'], { readFileFailuresBeforeSuccess: 1 })
14211490
const messagePath = '/slack/channels/D123ABC/messages/1780668000_000000/meta.json'
@@ -1830,6 +1899,30 @@ test('local fallback watchers use the shared Slack users command-root grammar',
18301899
)
18311900
})
18321901

1902+
test('subscription specs include concrete local roots for Slack user message mounts', () => {
1903+
const localRoot = join('/tmp', 'relayfile', 'workspaces', 'workspace-id', 'slack', 'users', 'U123', 'messages')
1904+
const specs = subscriptionSpecsFor(
1905+
[
1906+
integration({
1907+
provider: 'slack',
1908+
integrationId: 'slack-1',
1909+
mountPaths: ['/slack/users/U123/messages'],
1910+
localMountPaths: [localRoot],
1911+
scope: { listenDms: true }
1912+
})
1913+
],
1914+
'workspace-id'
1915+
)
1916+
1917+
assert.equal(specs.length, 1)
1918+
assert.deepEqual(specs[0].localMountRoots, [
1919+
{
1920+
localRoot,
1921+
remoteRoot: '/slack/users/U123/messages'
1922+
}
1923+
])
1924+
})
1925+
18331926
test('local watcher path construction does not duplicate remote path segments', () => {
18341927
const messageLocalRoot = join('/tmp', 'relayfile', 'workspaces', 'workspace-id', 'slack', 'channels', 'C0AD7UU0J1G', 'messages', '1780019742_971719')
18351928
const messageRemoteRoot = '/slack/channels/C0AD7UU0J1G/messages/1780019742_971719'
@@ -2021,6 +2114,68 @@ test('integration events ignore index, discovery, tmp, dotfile, and local writeb
20212114
assert.deepEqual(harness.listAgentsCalls, [])
20222115
})
20232116

2117+
test('confirmed Slack writeback success removes the local draft command file', async () => {
2118+
const workspaceRoot = await mkdtemp(join(tmpdir(), 'pear-writeback-cleanup-'))
2119+
const localRoot = join(workspaceRoot, 'workspace-id', 'slack', 'channels', 'C123ABC', 'messages')
2120+
const localDraftPath = join(localRoot, 'reply-confirmed.json')
2121+
const remoteDraftPath = '/slack/channels/C123ABC/messages/reply-confirmed.json'
2122+
await mkdir(localRoot, { recursive: true })
2123+
await writeFile(localDraftPath, JSON.stringify({ text: 'confirmed send' }))
2124+
2125+
try {
2126+
const harness = makeHarness(['alice'])
2127+
await harness.bridge.reconcile('project-1', [
2128+
integration({
2129+
provider: 'slack',
2130+
integrationId: 'slack-1',
2131+
mountPaths: ['/slack/channels/C123ABC/messages'],
2132+
localMountPaths: [localRoot],
2133+
scope: { notifyAgents: ['alice'] }
2134+
})
2135+
])
2136+
2137+
await harness.emit({
2138+
...changeEvent(remoteDraftPath, 'slack'),
2139+
type: 'writeback.succeeded'
2140+
} as ChangeEvent)
2141+
await waitForPathMissing(localDraftPath)
2142+
2143+
assert.deepEqual(harness.sent, [])
2144+
} finally {
2145+
await rm(workspaceRoot, { recursive: true, force: true })
2146+
}
2147+
})
2148+
2149+
test('Slack writeback draft cleanup waits for confirmed dispatch', async () => {
2150+
const workspaceRoot = await mkdtemp(join(tmpdir(), 'pear-writeback-cleanup-'))
2151+
const localRoot = join(workspaceRoot, 'workspace-id', 'slack', 'channels', 'C123ABC', 'messages')
2152+
const localDraftPath = join(localRoot, 'reply-pending.json')
2153+
const remoteDraftPath = '/slack/channels/C123ABC/messages/reply-pending.json'
2154+
await mkdir(localRoot, { recursive: true })
2155+
await writeFile(localDraftPath, JSON.stringify({ text: 'pending send' }))
2156+
2157+
try {
2158+
const harness = makeHarness(['alice'])
2159+
await harness.bridge.reconcile('project-1', [
2160+
integration({
2161+
provider: 'slack',
2162+
integrationId: 'slack-1',
2163+
mountPaths: ['/slack/channels/C123ABC/messages'],
2164+
localMountPaths: [localRoot],
2165+
scope: { notifyAgents: ['alice'] }
2166+
})
2167+
])
2168+
2169+
await harness.emit(changeEvent(remoteDraftPath, 'slack'))
2170+
await waitForDispatcherTick()
2171+
2172+
assert.equal(await stat(localDraftPath).then(() => true).catch(() => false), true)
2173+
assert.deepEqual(harness.sent, [])
2174+
} finally {
2175+
await rm(workspaceRoot, { recursive: true, force: true })
2176+
}
2177+
})
2178+
20242179
test('integration events notify nested non-numeric Slack message records', async () => {
20252180
const harness = makeHarness()
20262181

src/main/integration-event-bridge.ts

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { createHash } from 'node:crypto'
22
import { existsSync, watch, type FSWatcher } from 'node:fs'
3-
import { appendFile, mkdir, readFile, stat } from 'node:fs/promises'
3+
import { appendFile, mkdir, readFile, rm, stat } from 'node:fs/promises'
44
import { homedir } from 'node:os'
55
import { dirname, isAbsolute, join, relative, resolve, sep } from 'node:path'
66
import {
@@ -568,7 +568,7 @@ function targetLabels(targets: DeliveryTargets): string[] {
568568
return [...targets.agents.map((agent) => `@${agent}`), ...targets.channels]
569569
}
570570

571-
function subscriptionSpecsFor(
571+
export function subscriptionSpecsFor(
572572
integrations: ConnectedIntegration[],
573573
localMountWorkspaceId?: string
574574
): SubscriptionSpec[] {
@@ -1869,6 +1869,36 @@ async function readLocalEventContextPreview(
18691869
return undefined
18701870
}
18711871

1872+
async function cleanupConfirmedSlackWritebackDraft(
1873+
projectId: string,
1874+
event: ChangeEvent,
1875+
specs: SubscriptionSpec[]
1876+
): Promise<void> {
1877+
if (event.type !== 'writeback.succeeded') return
1878+
const remotePath = eventSummaryValue(event.resource.path)
1879+
if (!remotePath || !isLikelyLocalWritebackCommandPath(remotePath)) return
1880+
1881+
for (const spec of specs) {
1882+
if (spec.provider !== 'slack') continue
1883+
for (const root of spec.localMountRoots) {
1884+
if (!isSlackWritebackCommandRoot(root.remoteRoot)) continue
1885+
if (!pathIsInsideMount(remotePath, root.remoteRoot)) continue
1886+
const localPath = localPathForRemotePathInsideRoot(root.localRoot, root.remoteRoot, remotePath)
1887+
if (!localPathIsInsideRoot(root.localRoot, localPath)) continue
1888+
const stats = await stat(localPath).catch(() => null)
1889+
if (!stats || stats.isDirectory()) continue
1890+
await rm(localPath, { force: true })
1891+
logIntegrationEvent('confirmed Slack writeback draft cleaned', {
1892+
projectId,
1893+
eventId: event.id,
1894+
remotePath,
1895+
localRoot: root.localRoot
1896+
})
1897+
return
1898+
}
1899+
}
1900+
}
1901+
18721902
function slackScopeLabel(path: string): string | undefined {
18731903
const segments = pathSegments(path)
18741904
const channelIndex = segments.indexOf('channels')
@@ -2331,6 +2361,18 @@ export class IntegrationEventBridge {
23312361
type: event.type,
23322362
path: event.resource.path
23332363
})
2364+
void cleanupConfirmedSlackWritebackDraft(projectId, event, specs).catch((error) => {
2365+
warnIntegrationEventAggregated(
2366+
`confirmed writeback cleanup failed:${projectId}`,
2367+
'confirmed writeback cleanup failed',
2368+
{
2369+
projectId,
2370+
eventId: event.id,
2371+
path: event.resource.path,
2372+
error: toErrorMessage(error)
2373+
}
2374+
)
2375+
})
23342376
void this.enqueueEvent(projectId, event, specs, {
23352377
source: 'remote',
23362378
subscriptionStartedAtMs: remoteSubscriptionStartedAtMs,

src/main/integrations.test.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -757,6 +757,38 @@ describe('IntegrationsManager', () => {
757757
)
758758
})
759759

760+
it('passes current local mount paths into integration event reconciliation', async () => {
761+
mock.store.projects[0].integrations[0] = {
762+
...mock.store.projects[0].integrations[0],
763+
mountPaths: ['/slack/users/U0ADJH4P83T/messages'],
764+
scope: { listenDms: true },
765+
subscribeAgent: true
766+
}
767+
mock.integrationMountManager.currentWorkspaceId.mockReturnValue('workspace-id')
768+
mock.integrationMountManager.localPathsFor.mockReturnValue([
769+
'/tmp/relayfile/workspace-id/slack/users/U0ADJH4P83T/messages'
770+
])
771+
const manager = new IntegrationsManager()
772+
773+
await manager.refreshAgentState('project-1')
774+
775+
expect(mock.integrationEventBridge.reconcile).toHaveBeenCalledWith(
776+
'project-1',
777+
[
778+
expect.objectContaining({
779+
provider: 'slack',
780+
integrationId: 'slack-integration-1',
781+
mountPaths: ['/slack/users/U0ADJH4P83T/messages'],
782+
localMountPaths: ['/tmp/relayfile/workspace-id/slack/users/U0ADJH4P83T/messages']
783+
})
784+
]
785+
)
786+
expect(mock.integrationMountManager.localPathsFor).toHaveBeenCalledWith('workspace-id', {
787+
provider: 'slack',
788+
mountPaths: ['/discovery/slack', '/slack/users/U0ADJH4P83T/messages']
789+
})
790+
})
791+
760792
it('retries active integration event subscriptions after startup mount hydration', async () => {
761793
mock.store.projects[0].integrations[0].subscribeAgent = true
762794
const manager = new IntegrationsManager()

src/main/integrations.ts

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2028,7 +2028,7 @@ export class IntegrationsManager {
20282028
await integrationEventBridge.closeAllExcept(projectId)
20292029
await integrationEventBridge.reconcile(
20302030
projectId,
2031-
this.visibleIntegrationsForProject(projectId)
2031+
this.withCurrentLocalMountPaths(this.visibleIntegrationsForProject(projectId))
20322032
)
20332033
return true
20342034
} catch (error) {
@@ -2155,10 +2155,7 @@ export class IntegrationsManager {
21552155
}))
21562156
}
21572157

2158-
private async withLocalMountPaths(integrations: ConnectedIntegration[]): Promise<ConnectedIntegration[]> {
2159-
await this.syncLocalMounts({ hydrateCloud: false }).catch((error) => {
2160-
if (!isIntegrationAuthRecoveryError(error)) throw error
2161-
})
2158+
private withCurrentLocalMountPaths(integrations: ConnectedIntegration[]): ConnectedIntegration[] {
21622159
const workspaceId = integrationMountManager.currentWorkspaceId()
21632160
if (!workspaceId) return integrations
21642161
return integrations.map((integration) => {
@@ -2172,6 +2169,13 @@ export class IntegrationsManager {
21722169
})
21732170
}
21742171

2172+
private async withLocalMountPaths(integrations: ConnectedIntegration[]): Promise<ConnectedIntegration[]> {
2173+
await this.syncLocalMounts({ hydrateCloud: false }).catch((error) => {
2174+
if (!isIntegrationAuthRecoveryError(error)) throw error
2175+
})
2176+
return this.withCurrentLocalMountPaths(integrations)
2177+
}
2178+
21752179
private async resolveIntegrationMountPath(projectId: string, integrationId: string, targetPath: string): Promise<string> {
21762180
const resolvedPath = resolve(targetPath)
21772181
const localIntegrations = await this.withLocalMountPaths(this.listConnected(projectId))

0 commit comments

Comments
 (0)