Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 157 additions & 2 deletions src/main/__tests__/integration-event-bridge.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import assert from 'node:assert/strict'
import { mkdir, mkdtemp, rm, writeFile } from 'node:fs/promises'
import { mkdir, mkdtemp, rm, stat, writeFile } from 'node:fs/promises'
import { tmpdir } from 'node:os'
import { join } from 'node:path'
import { beforeEach, test } from 'node:test'
Expand All @@ -15,7 +15,8 @@ import {
localWatchEventPathsForFilename,
localWatchRootsFor,
relayfileSdkPathFiltersFor,
resetIntegrationEventTelemetryForTests
resetIntegrationEventTelemetryForTests,
subscriptionSpecsFor
} from '../integration-event-bridge.ts'
import type { ConnectedIntegration } from '../integrations.ts'

Expand Down Expand Up @@ -150,6 +151,16 @@ async function waitUntil(predicate: () => boolean, timeoutMs = 2_000): Promise<v
assert.equal(predicate(), true)
}

async function waitForPathMissing(path: string, timeoutMs = 2_000): Promise<void> {
const deadline = Date.now() + timeoutMs
while (Date.now() <= deadline) {
const stats = await stat(path).catch(() => null)
if (!stats) return
await new Promise((resolve) => setTimeout(resolve, 10))
}
assert.equal(await stat(path).then(() => true).catch(() => false), false)
}
Comment on lines +154 to +162
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Avoid swallowing non-ENOENT errors in waitForPathMissing.

On Line 157, catch(() => null) converts every stat failure into “missing,” which can falsely pass tests on permission/IO errors.

Suggested fix
 async function waitForPathMissing(path: string, timeoutMs = 2_000): Promise<void> {
   const deadline = Date.now() + timeoutMs
   while (Date.now() <= deadline) {
-    const stats = await stat(path).catch(() => null)
+    const stats = await stat(path).catch((error: NodeJS.ErrnoException) => {
+      if (error?.code === 'ENOENT') return null
+      throw error
+    })
     if (!stats) return
     await new Promise((resolve) => setTimeout(resolve, 10))
   }
-  assert.equal(await stat(path).then(() => true).catch(() => false), false)
+  const exists = await stat(path)
+    .then(() => true)
+    .catch((error: NodeJS.ErrnoException) => {
+      if (error?.code === 'ENOENT') return false
+      throw error
+    })
+  assert.equal(exists, false)
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async function waitForPathMissing(path: string, timeoutMs = 2_000): Promise<void> {
const deadline = Date.now() + timeoutMs
while (Date.now() <= deadline) {
const stats = await stat(path).catch(() => null)
if (!stats) return
await new Promise((resolve) => setTimeout(resolve, 10))
}
assert.equal(await stat(path).then(() => true).catch(() => false), false)
}
async function waitForPathMissing(path: string, timeoutMs = 2_000): Promise<void> {
const deadline = Date.now() + timeoutMs
while (Date.now() <= deadline) {
const stats = await stat(path).catch((error: NodeJS.ErrnoException) => {
if (error?.code === 'ENOENT') return null
throw error
})
if (!stats) return
await new Promise((resolve) => setTimeout(resolve, 10))
}
const exists = await stat(path)
.then(() => true)
.catch((error: NodeJS.ErrnoException) => {
if (error?.code === 'ENOENT') return false
throw error
})
assert.equal(exists, false)
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/main/__tests__/integration-event-bridge.test.ts` around lines 154 - 162,
The helper waitForPathMissing currently swallows all errors from stat (catch(()
=> null)), which treats permission/IO errors as “missing”; change the catch to
only treat ENOENT as missing and rethrow any other errors: in waitForPathMissing
wrap the stat call with a catch that checks err.code === 'ENOENT' then returns
null, otherwise rethrow the error so tests fail on non-ENOENT issues; reference
the waitForPathMissing function and the stat call to locate the change.


function makeHarness(
agents = ['alice', 'bob'],
options: {
Expand Down Expand Up @@ -1416,6 +1427,64 @@ test('slack context falls back to expanded event data when targeted remote previ
assert.equal((harness.sent[0].input.data?.contextPreview as { content?: string } | undefined)?.content, undefined)
})

test('slack DM context uses materialized local file when targeted remote preview is missing', async () => {
const workspaceRoot = await mkdtemp(join(tmpdir(), 'pear-event-preview-'))
const localRoot = join(workspaceRoot, 'workspace-id', 'slack', 'users', 'U0ADJH4P83T', 'messages')
const messagePath = '/slack/users/U0ADJH4P83T/messages/1780905125_300069/meta.json'
await mkdir(join(localRoot, '1780905125_300069'), { recursive: true })
await writeFile(
join(localRoot, '1780905125_300069', 'meta.json'),
JSON.stringify({
provider: 'slack',
text: 'local Slack DM context',
user: 'U123',
dm_user_id: 'U0ADJH4P83T'
})
)

try {
const harness = makeHarness(['alice'], { failReadFile: true })

await withMockedNow('2026-06-05T14:00:00.000Z', async () => {
await harness.bridge.reconcile('project-1', [
integration({
provider: 'slack',
integrationId: 'slack-1',
mountPaths: ['/slack/users/U0ADJH4P83T/messages'],
localMountPaths: [localRoot],
downloadHistoricalData: false,
scope: { listenDms: true, notifyAgents: ['alice'] }
})
])
})

await harness.emit({
...changeEvent(messagePath, 'slack'),
expand: async () => ({
level: 'full',
path: messagePath,
data: {
path: messagePath,
deleted: false
}
})
} as ChangeEvent)
await waitForSent(harness, 1, 2_500)

assert.match(harness.sent[0].input.text, /Slack message event/u)
assert.match(harness.sent[0].input.text, /Location: User U0ADJH4P83T/u)
assert.match(harness.sent[0].input.text, /Author: U123/u)
assert.match(harness.sent[0].input.text, /Message:\nlocal Slack DM context/u)
assert.doesNotMatch(harness.sent[0].input.text, /Message: unavailable/u)
assert.equal(
(harness.sent[0].input.data?.contextPreview as { path?: string } | undefined)?.path,
messagePath
)
} finally {
await rm(workspaceRoot, { recursive: true, force: true })
}
})

test('slack context retries targeted remote preview before falling back to sparse event data', async () => {
const harness = makeHarness(['alice'], { readFileFailuresBeforeSuccess: 1 })
const messagePath = '/slack/channels/D123ABC/messages/1780668000_000000/meta.json'
Expand Down Expand Up @@ -1830,6 +1899,30 @@ test('local fallback watchers use the shared Slack users command-root grammar',
)
})

test('subscription specs include concrete local roots for Slack user message mounts', () => {
const localRoot = join('/tmp', 'relayfile', 'workspaces', 'workspace-id', 'slack', 'users', 'U123', 'messages')
const specs = subscriptionSpecsFor(
[
integration({
provider: 'slack',
integrationId: 'slack-1',
mountPaths: ['/slack/users/U123/messages'],
localMountPaths: [localRoot],
scope: { listenDms: true }
})
],
'workspace-id'
)

assert.equal(specs.length, 1)
assert.deepEqual(specs[0].localMountRoots, [
{
localRoot,
remoteRoot: '/slack/users/U123/messages'
}
])
})

test('local watcher path construction does not duplicate remote path segments', () => {
const messageLocalRoot = join('/tmp', 'relayfile', 'workspaces', 'workspace-id', 'slack', 'channels', 'C0AD7UU0J1G', 'messages', '1780019742_971719')
const messageRemoteRoot = '/slack/channels/C0AD7UU0J1G/messages/1780019742_971719'
Expand Down Expand Up @@ -2021,6 +2114,68 @@ test('integration events ignore index, discovery, tmp, dotfile, and local writeb
assert.deepEqual(harness.listAgentsCalls, [])
})

test('confirmed Slack writeback success removes the local draft command file', async () => {
const workspaceRoot = await mkdtemp(join(tmpdir(), 'pear-writeback-cleanup-'))
const localRoot = join(workspaceRoot, 'workspace-id', 'slack', 'channels', 'C123ABC', 'messages')
const localDraftPath = join(localRoot, 'reply-confirmed.json')
const remoteDraftPath = '/slack/channels/C123ABC/messages/reply-confirmed.json'
await mkdir(localRoot, { recursive: true })
await writeFile(localDraftPath, JSON.stringify({ text: 'confirmed send' }))

try {
const harness = makeHarness(['alice'])
await harness.bridge.reconcile('project-1', [
integration({
provider: 'slack',
integrationId: 'slack-1',
mountPaths: ['/slack/channels/C123ABC/messages'],
localMountPaths: [localRoot],
scope: { notifyAgents: ['alice'] }
})
])

await harness.emit({
...changeEvent(remoteDraftPath, 'slack'),
type: 'writeback.succeeded'
} as ChangeEvent)
await waitForPathMissing(localDraftPath)

assert.deepEqual(harness.sent, [])
} finally {
await rm(workspaceRoot, { recursive: true, force: true })
}
})

test('Slack writeback draft cleanup waits for confirmed dispatch', async () => {
const workspaceRoot = await mkdtemp(join(tmpdir(), 'pear-writeback-cleanup-'))
const localRoot = join(workspaceRoot, 'workspace-id', 'slack', 'channels', 'C123ABC', 'messages')
const localDraftPath = join(localRoot, 'reply-pending.json')
const remoteDraftPath = '/slack/channels/C123ABC/messages/reply-pending.json'
await mkdir(localRoot, { recursive: true })
await writeFile(localDraftPath, JSON.stringify({ text: 'pending send' }))

try {
const harness = makeHarness(['alice'])
await harness.bridge.reconcile('project-1', [
integration({
provider: 'slack',
integrationId: 'slack-1',
mountPaths: ['/slack/channels/C123ABC/messages'],
localMountPaths: [localRoot],
scope: { notifyAgents: ['alice'] }
})
])

await harness.emit(changeEvent(remoteDraftPath, 'slack'))
await waitForDispatcherTick()

assert.equal(await stat(localDraftPath).then(() => true).catch(() => false), true)
assert.deepEqual(harness.sent, [])
} finally {
await rm(workspaceRoot, { recursive: true, force: true })
}
})
Comment on lines +2117 to +2177
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Add duplicate/replay coverage for confirmed writeback cleanup idempotency.

These tests validate confirmed and pending flows, but they don’t assert behavior when writeback.succeeded is replayed for the same draft (common on event streams). Add a second identical confirmed event and verify no error/no side effects after the first deletion.

As per coding guidelines, **/*.test.{js,ts,tsx} changes touching event streaming or integration notifications should include duplicate/replay cases, not only happy-path flow.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/main/__tests__/integration-event-bridge.test.ts` around lines 2117 -
2177, Add a duplicate/replay check to the "confirmed Slack writeback success
removes the local draft command file" test: after the first harness.emit of the
writeback.succeeded ChangeEvent (remoteDraftPath) and waitForPathMissing, emit
the identical writeback.succeeded event again via harness.emit (same ChangeEvent
payload) and assert the local draft still does not exist (waitForPathMissing or
stat check) and harness.sent remains [] to ensure idempotent cleanup; update
assertions around localDraftPath, remoteDraftPath, harness.emit and
waitForPathMissing to reflect the replayed event case.

Source: Coding guidelines


test('integration events notify nested non-numeric Slack message records', async () => {
const harness = makeHarness()

Expand Down
46 changes: 44 additions & 2 deletions src/main/integration-event-bridge.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { createHash } from 'node:crypto'
import { existsSync, watch, type FSWatcher } from 'node:fs'
import { appendFile, mkdir, readFile, stat } from 'node:fs/promises'
import { appendFile, mkdir, readFile, rm, stat } from 'node:fs/promises'
import { homedir } from 'node:os'
import { dirname, isAbsolute, join, relative, resolve, sep } from 'node:path'
import {
Expand Down Expand Up @@ -568,7 +568,7 @@ function targetLabels(targets: DeliveryTargets): string[] {
return [...targets.agents.map((agent) => `@${agent}`), ...targets.channels]
}

function subscriptionSpecsFor(
export function subscriptionSpecsFor(
integrations: ConnectedIntegration[],
localMountWorkspaceId?: string
): SubscriptionSpec[] {
Expand Down Expand Up @@ -1869,6 +1869,36 @@ async function readLocalEventContextPreview(
return undefined
}

async function cleanupConfirmedSlackWritebackDraft(
projectId: string,
event: ChangeEvent,
specs: SubscriptionSpec[]
): Promise<void> {
if (event.type !== 'writeback.succeeded') return
const remotePath = eventSummaryValue(event.resource.path)
if (!remotePath || !isLikelyLocalWritebackCommandPath(remotePath)) return

for (const spec of specs) {
if (spec.provider !== 'slack') continue
for (const root of spec.localMountRoots) {
if (!isSlackWritebackCommandRoot(root.remoteRoot)) continue
if (!pathIsInsideMount(remotePath, root.remoteRoot)) continue
Comment on lines +1884 to +1885
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

When downloadHistoricalData is enabled (true), the root.remoteRoot is a broader history root (e.g., /slack/channels/C123) rather than the narrow command root (e.g., /slack/channels/C123/messages). This causes isSlackWritebackCommandRoot(root.remoteRoot) to return false, completely bypassing the cleanup of confirmed writeback drafts for users with historical download enabled.\n\nSince isLikelyLocalWritebackCommandPath(remotePath) and pathIsInsideMount(remotePath, root.remoteRoot) already ensure the path is a valid writeback command file within the mount, this extra check is unnecessary and buggy.

Suggested change
if (!isSlackWritebackCommandRoot(root.remoteRoot)) continue
if (!pathIsInsideMount(remotePath, root.remoteRoot)) continue
if (!pathIsInsideMount(remotePath, root.remoteRoot)) continue

const localPath = localPathForRemotePathInsideRoot(root.localRoot, root.remoteRoot, remotePath)
if (!localPathIsInsideRoot(root.localRoot, localPath)) continue
const stats = await stat(localPath).catch(() => null)
if (!stats || stats.isDirectory()) continue
await rm(localPath, { force: true })
logIntegrationEvent('confirmed Slack writeback draft cleaned', {
projectId,
eventId: event.id,
remotePath,
localRoot: root.localRoot
})
return
Comment on lines +1891 to +1897
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Returning early after the first successful deletion prevents the cleanup of duplicate draft files in other matching local mount roots (for example, if the same channel is mounted in multiple local workspaces or roots). Removing the return statement allows the loop to continue and clean up all matching local draft files.

      logIntegrationEvent('confirmed Slack writeback draft cleaned', {\n        projectId,\n        eventId: event.id,\n        remotePath,\n        localRoot: root.localRoot\n      })

}
}
}

function slackScopeLabel(path: string): string | undefined {
const segments = pathSegments(path)
const channelIndex = segments.indexOf('channels')
Expand Down Expand Up @@ -2331,6 +2361,18 @@ export class IntegrationEventBridge {
type: event.type,
path: event.resource.path
})
void cleanupConfirmedSlackWritebackDraft(projectId, event, specs).catch((error) => {
warnIntegrationEventAggregated(
`confirmed writeback cleanup failed:${projectId}`,
'confirmed writeback cleanup failed',
{
projectId,
eventId: event.id,
path: event.resource.path,
error: toErrorMessage(error)
}
)
})
void this.enqueueEvent(projectId, event, specs, {
source: 'remote',
subscriptionStartedAtMs: remoteSubscriptionStartedAtMs,
Expand Down
32 changes: 32 additions & 0 deletions src/main/integrations.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,38 @@ describe('IntegrationsManager', () => {
)
})

it('passes current local mount paths into integration event reconciliation', async () => {
mock.store.projects[0].integrations[0] = {
...mock.store.projects[0].integrations[0],
mountPaths: ['/slack/users/U0ADJH4P83T/messages'],
scope: { listenDms: true },
subscribeAgent: true
}
mock.integrationMountManager.currentWorkspaceId.mockReturnValue('workspace-id')
mock.integrationMountManager.localPathsFor.mockReturnValue([
'/tmp/relayfile/workspace-id/slack/users/U0ADJH4P83T/messages'
])
const manager = new IntegrationsManager()

await manager.refreshAgentState('project-1')

expect(mock.integrationEventBridge.reconcile).toHaveBeenCalledWith(
'project-1',
[
expect.objectContaining({
provider: 'slack',
integrationId: 'slack-integration-1',
mountPaths: ['/slack/users/U0ADJH4P83T/messages'],
localMountPaths: ['/tmp/relayfile/workspace-id/slack/users/U0ADJH4P83T/messages']
})
]
)
expect(mock.integrationMountManager.localPathsFor).toHaveBeenCalledWith('workspace-id', {
provider: 'slack',
mountPaths: ['/discovery/slack', '/slack/users/U0ADJH4P83T/messages']
})
})

it('retries active integration event subscriptions after startup mount hydration', async () => {
mock.store.projects[0].integrations[0].subscribeAgent = true
const manager = new IntegrationsManager()
Expand Down
14 changes: 9 additions & 5 deletions src/main/integrations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2028,7 +2028,7 @@ export class IntegrationsManager {
await integrationEventBridge.closeAllExcept(projectId)
await integrationEventBridge.reconcile(
projectId,
this.visibleIntegrationsForProject(projectId)
this.withCurrentLocalMountPaths(this.visibleIntegrationsForProject(projectId))
)
return true
} catch (error) {
Expand Down Expand Up @@ -2155,10 +2155,7 @@ export class IntegrationsManager {
}))
}

private async withLocalMountPaths(integrations: ConnectedIntegration[]): Promise<ConnectedIntegration[]> {
await this.syncLocalMounts({ hydrateCloud: false }).catch((error) => {
if (!isIntegrationAuthRecoveryError(error)) throw error
})
private withCurrentLocalMountPaths(integrations: ConnectedIntegration[]): ConnectedIntegration[] {
const workspaceId = integrationMountManager.currentWorkspaceId()
if (!workspaceId) return integrations
return integrations.map((integration) => {
Expand All @@ -2172,6 +2169,13 @@ export class IntegrationsManager {
})
}

private async withLocalMountPaths(integrations: ConnectedIntegration[]): Promise<ConnectedIntegration[]> {
await this.syncLocalMounts({ hydrateCloud: false }).catch((error) => {
if (!isIntegrationAuthRecoveryError(error)) throw error
})
return this.withCurrentLocalMountPaths(integrations)
}

private async resolveIntegrationMountPath(projectId: string, integrationId: string, targetPath: string): Promise<string> {
const resolvedPath = resolve(targetPath)
const localIntegrations = await this.withLocalMountPaths(this.listConnected(projectId))
Expand Down
Loading