-
Notifications
You must be signed in to change notification settings - Fork 0
Fix integration event local previews #159
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,5 @@ | ||
| import assert from 'node:assert/strict' | ||
| import { mkdir, mkdtemp, rm, writeFile } from 'node:fs/promises' | ||
| import { mkdir, mkdtemp, rm, stat, writeFile } from 'node:fs/promises' | ||
| import { tmpdir } from 'node:os' | ||
| import { join } from 'node:path' | ||
| import { beforeEach, test } from 'node:test' | ||
|
|
@@ -15,7 +15,8 @@ import { | |
| localWatchEventPathsForFilename, | ||
| localWatchRootsFor, | ||
| relayfileSdkPathFiltersFor, | ||
| resetIntegrationEventTelemetryForTests | ||
| resetIntegrationEventTelemetryForTests, | ||
| subscriptionSpecsFor | ||
| } from '../integration-event-bridge.ts' | ||
| import type { ConnectedIntegration } from '../integrations.ts' | ||
|
|
||
|
|
@@ -150,6 +151,16 @@ async function waitUntil(predicate: () => boolean, timeoutMs = 2_000): Promise<v | |
| assert.equal(predicate(), true) | ||
| } | ||
|
|
||
| async function waitForPathMissing(path: string, timeoutMs = 2_000): Promise<void> { | ||
| const deadline = Date.now() + timeoutMs | ||
| while (Date.now() <= deadline) { | ||
| const stats = await stat(path).catch(() => null) | ||
| if (!stats) return | ||
| await new Promise((resolve) => setTimeout(resolve, 10)) | ||
| } | ||
| assert.equal(await stat(path).then(() => true).catch(() => false), false) | ||
| } | ||
|
|
||
| function makeHarness( | ||
| agents = ['alice', 'bob'], | ||
| options: { | ||
|
|
@@ -1416,6 +1427,64 @@ test('slack context falls back to expanded event data when targeted remote previ | |
| assert.equal((harness.sent[0].input.data?.contextPreview as { content?: string } | undefined)?.content, undefined) | ||
| }) | ||
|
|
||
| test('slack DM context uses materialized local file when targeted remote preview is missing', async () => { | ||
| const workspaceRoot = await mkdtemp(join(tmpdir(), 'pear-event-preview-')) | ||
| const localRoot = join(workspaceRoot, 'workspace-id', 'slack', 'users', 'U0ADJH4P83T', 'messages') | ||
| const messagePath = '/slack/users/U0ADJH4P83T/messages/1780905125_300069/meta.json' | ||
| await mkdir(join(localRoot, '1780905125_300069'), { recursive: true }) | ||
| await writeFile( | ||
| join(localRoot, '1780905125_300069', 'meta.json'), | ||
| JSON.stringify({ | ||
| provider: 'slack', | ||
| text: 'local Slack DM context', | ||
| user: 'U123', | ||
| dm_user_id: 'U0ADJH4P83T' | ||
| }) | ||
| ) | ||
|
|
||
| try { | ||
| const harness = makeHarness(['alice'], { failReadFile: true }) | ||
|
|
||
| await withMockedNow('2026-06-05T14:00:00.000Z', async () => { | ||
| await harness.bridge.reconcile('project-1', [ | ||
| integration({ | ||
| provider: 'slack', | ||
| integrationId: 'slack-1', | ||
| mountPaths: ['/slack/users/U0ADJH4P83T/messages'], | ||
| localMountPaths: [localRoot], | ||
| downloadHistoricalData: false, | ||
| scope: { listenDms: true, notifyAgents: ['alice'] } | ||
| }) | ||
| ]) | ||
| }) | ||
|
|
||
| await harness.emit({ | ||
| ...changeEvent(messagePath, 'slack'), | ||
| expand: async () => ({ | ||
| level: 'full', | ||
| path: messagePath, | ||
| data: { | ||
| path: messagePath, | ||
| deleted: false | ||
| } | ||
| }) | ||
| } as ChangeEvent) | ||
| await waitForSent(harness, 1, 2_500) | ||
|
|
||
| assert.match(harness.sent[0].input.text, /Slack message event/u) | ||
| assert.match(harness.sent[0].input.text, /Location: User U0ADJH4P83T/u) | ||
| assert.match(harness.sent[0].input.text, /Author: U123/u) | ||
| assert.match(harness.sent[0].input.text, /Message:\nlocal Slack DM context/u) | ||
| assert.doesNotMatch(harness.sent[0].input.text, /Message: unavailable/u) | ||
| assert.equal( | ||
| (harness.sent[0].input.data?.contextPreview as { path?: string } | undefined)?.path, | ||
| messagePath | ||
| ) | ||
| } finally { | ||
| await rm(workspaceRoot, { recursive: true, force: true }) | ||
| } | ||
| }) | ||
|
|
||
| test('slack context retries targeted remote preview before falling back to sparse event data', async () => { | ||
| const harness = makeHarness(['alice'], { readFileFailuresBeforeSuccess: 1 }) | ||
| const messagePath = '/slack/channels/D123ABC/messages/1780668000_000000/meta.json' | ||
|
|
@@ -1830,6 +1899,30 @@ test('local fallback watchers use the shared Slack users command-root grammar', | |
| ) | ||
| }) | ||
|
|
||
| test('subscription specs include concrete local roots for Slack user message mounts', () => { | ||
| const localRoot = join('/tmp', 'relayfile', 'workspaces', 'workspace-id', 'slack', 'users', 'U123', 'messages') | ||
| const specs = subscriptionSpecsFor( | ||
| [ | ||
| integration({ | ||
| provider: 'slack', | ||
| integrationId: 'slack-1', | ||
| mountPaths: ['/slack/users/U123/messages'], | ||
| localMountPaths: [localRoot], | ||
| scope: { listenDms: true } | ||
| }) | ||
| ], | ||
| 'workspace-id' | ||
| ) | ||
|
|
||
| assert.equal(specs.length, 1) | ||
| assert.deepEqual(specs[0].localMountRoots, [ | ||
| { | ||
| localRoot, | ||
| remoteRoot: '/slack/users/U123/messages' | ||
| } | ||
| ]) | ||
| }) | ||
|
|
||
| test('local watcher path construction does not duplicate remote path segments', () => { | ||
| const messageLocalRoot = join('/tmp', 'relayfile', 'workspaces', 'workspace-id', 'slack', 'channels', 'C0AD7UU0J1G', 'messages', '1780019742_971719') | ||
| const messageRemoteRoot = '/slack/channels/C0AD7UU0J1G/messages/1780019742_971719' | ||
|
|
@@ -2021,6 +2114,68 @@ test('integration events ignore index, discovery, tmp, dotfile, and local writeb | |
| assert.deepEqual(harness.listAgentsCalls, []) | ||
| }) | ||
|
|
||
| test('confirmed Slack writeback success removes the local draft command file', async () => { | ||
| const workspaceRoot = await mkdtemp(join(tmpdir(), 'pear-writeback-cleanup-')) | ||
| const localRoot = join(workspaceRoot, 'workspace-id', 'slack', 'channels', 'C123ABC', 'messages') | ||
| const localDraftPath = join(localRoot, 'reply-confirmed.json') | ||
| const remoteDraftPath = '/slack/channels/C123ABC/messages/reply-confirmed.json' | ||
| await mkdir(localRoot, { recursive: true }) | ||
| await writeFile(localDraftPath, JSON.stringify({ text: 'confirmed send' })) | ||
|
|
||
| try { | ||
| const harness = makeHarness(['alice']) | ||
| await harness.bridge.reconcile('project-1', [ | ||
| integration({ | ||
| provider: 'slack', | ||
| integrationId: 'slack-1', | ||
| mountPaths: ['/slack/channels/C123ABC/messages'], | ||
| localMountPaths: [localRoot], | ||
| scope: { notifyAgents: ['alice'] } | ||
| }) | ||
| ]) | ||
|
|
||
| await harness.emit({ | ||
| ...changeEvent(remoteDraftPath, 'slack'), | ||
| type: 'writeback.succeeded' | ||
| } as ChangeEvent) | ||
| await waitForPathMissing(localDraftPath) | ||
|
|
||
| assert.deepEqual(harness.sent, []) | ||
| } finally { | ||
| await rm(workspaceRoot, { recursive: true, force: true }) | ||
| } | ||
| }) | ||
|
|
||
| test('Slack writeback draft cleanup waits for confirmed dispatch', async () => { | ||
| const workspaceRoot = await mkdtemp(join(tmpdir(), 'pear-writeback-cleanup-')) | ||
| const localRoot = join(workspaceRoot, 'workspace-id', 'slack', 'channels', 'C123ABC', 'messages') | ||
| const localDraftPath = join(localRoot, 'reply-pending.json') | ||
| const remoteDraftPath = '/slack/channels/C123ABC/messages/reply-pending.json' | ||
| await mkdir(localRoot, { recursive: true }) | ||
| await writeFile(localDraftPath, JSON.stringify({ text: 'pending send' })) | ||
|
|
||
| try { | ||
| const harness = makeHarness(['alice']) | ||
| await harness.bridge.reconcile('project-1', [ | ||
| integration({ | ||
| provider: 'slack', | ||
| integrationId: 'slack-1', | ||
| mountPaths: ['/slack/channels/C123ABC/messages'], | ||
| localMountPaths: [localRoot], | ||
| scope: { notifyAgents: ['alice'] } | ||
| }) | ||
| ]) | ||
|
|
||
| await harness.emit(changeEvent(remoteDraftPath, 'slack')) | ||
| await waitForDispatcherTick() | ||
|
|
||
| assert.equal(await stat(localDraftPath).then(() => true).catch(() => false), true) | ||
| assert.deepEqual(harness.sent, []) | ||
| } finally { | ||
| await rm(workspaceRoot, { recursive: true, force: true }) | ||
| } | ||
| }) | ||
|
Comment on lines
+2117
to
+2177
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add duplicate/replay coverage for confirmed writeback cleanup idempotency. These tests validate confirmed and pending flows, but they don’t assert behavior when As per coding guidelines, 🤖 Prompt for AI AgentsSource: Coding guidelines |
||
|
|
||
| test('integration events notify nested non-numeric Slack message records', async () => { | ||
| const harness = makeHarness() | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
| @@ -1,6 +1,6 @@ | ||||||||
| import { createHash } from 'node:crypto' | ||||||||
| import { existsSync, watch, type FSWatcher } from 'node:fs' | ||||||||
| import { appendFile, mkdir, readFile, stat } from 'node:fs/promises' | ||||||||
| import { appendFile, mkdir, readFile, rm, stat } from 'node:fs/promises' | ||||||||
| import { homedir } from 'node:os' | ||||||||
| import { dirname, isAbsolute, join, relative, resolve, sep } from 'node:path' | ||||||||
| import { | ||||||||
|
|
@@ -568,7 +568,7 @@ function targetLabels(targets: DeliveryTargets): string[] { | |||||||
| return [...targets.agents.map((agent) => `@${agent}`), ...targets.channels] | ||||||||
| } | ||||||||
|
|
||||||||
| function subscriptionSpecsFor( | ||||||||
| export function subscriptionSpecsFor( | ||||||||
| integrations: ConnectedIntegration[], | ||||||||
| localMountWorkspaceId?: string | ||||||||
| ): SubscriptionSpec[] { | ||||||||
|
|
@@ -1869,6 +1869,36 @@ async function readLocalEventContextPreview( | |||||||
| return undefined | ||||||||
| } | ||||||||
|
|
||||||||
| async function cleanupConfirmedSlackWritebackDraft( | ||||||||
| projectId: string, | ||||||||
| event: ChangeEvent, | ||||||||
| specs: SubscriptionSpec[] | ||||||||
| ): Promise<void> { | ||||||||
| if (event.type !== 'writeback.succeeded') return | ||||||||
| const remotePath = eventSummaryValue(event.resource.path) | ||||||||
| if (!remotePath || !isLikelyLocalWritebackCommandPath(remotePath)) return | ||||||||
|
|
||||||||
| for (const spec of specs) { | ||||||||
| if (spec.provider !== 'slack') continue | ||||||||
| for (const root of spec.localMountRoots) { | ||||||||
| if (!isSlackWritebackCommandRoot(root.remoteRoot)) continue | ||||||||
| if (!pathIsInsideMount(remotePath, root.remoteRoot)) continue | ||||||||
|
Comment on lines
+1884
to
+1885
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When
Suggested change
|
||||||||
| const localPath = localPathForRemotePathInsideRoot(root.localRoot, root.remoteRoot, remotePath) | ||||||||
| if (!localPathIsInsideRoot(root.localRoot, localPath)) continue | ||||||||
| const stats = await stat(localPath).catch(() => null) | ||||||||
| if (!stats || stats.isDirectory()) continue | ||||||||
| await rm(localPath, { force: true }) | ||||||||
| logIntegrationEvent('confirmed Slack writeback draft cleaned', { | ||||||||
| projectId, | ||||||||
| eventId: event.id, | ||||||||
| remotePath, | ||||||||
| localRoot: root.localRoot | ||||||||
| }) | ||||||||
| return | ||||||||
|
Comment on lines
+1891
to
+1897
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Returning early after the first successful deletion prevents the cleanup of duplicate draft files in other matching local mount roots (for example, if the same channel is mounted in multiple local workspaces or roots). Removing the logIntegrationEvent('confirmed Slack writeback draft cleaned', {\n projectId,\n eventId: event.id,\n remotePath,\n localRoot: root.localRoot\n }) |
||||||||
| } | ||||||||
| } | ||||||||
| } | ||||||||
|
|
||||||||
| function slackScopeLabel(path: string): string | undefined { | ||||||||
| const segments = pathSegments(path) | ||||||||
| const channelIndex = segments.indexOf('channels') | ||||||||
|
|
@@ -2331,6 +2361,18 @@ export class IntegrationEventBridge { | |||||||
| type: event.type, | ||||||||
| path: event.resource.path | ||||||||
| }) | ||||||||
| void cleanupConfirmedSlackWritebackDraft(projectId, event, specs).catch((error) => { | ||||||||
| warnIntegrationEventAggregated( | ||||||||
| `confirmed writeback cleanup failed:${projectId}`, | ||||||||
| 'confirmed writeback cleanup failed', | ||||||||
| { | ||||||||
| projectId, | ||||||||
| eventId: event.id, | ||||||||
| path: event.resource.path, | ||||||||
| error: toErrorMessage(error) | ||||||||
| } | ||||||||
| ) | ||||||||
| }) | ||||||||
| void this.enqueueEvent(projectId, event, specs, { | ||||||||
| source: 'remote', | ||||||||
| subscriptionStartedAtMs: remoteSubscriptionStartedAtMs, | ||||||||
|
|
||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid swallowing non-ENOENT errors in
waitForPathMissing.On Line 157,
catch(() => null)converts everystatfailure into “missing,” which can falsely pass tests on permission/IO errors.Suggested fix
async function waitForPathMissing(path: string, timeoutMs = 2_000): Promise<void> { const deadline = Date.now() + timeoutMs while (Date.now() <= deadline) { - const stats = await stat(path).catch(() => null) + const stats = await stat(path).catch((error: NodeJS.ErrnoException) => { + if (error?.code === 'ENOENT') return null + throw error + }) if (!stats) return await new Promise((resolve) => setTimeout(resolve, 10)) } - assert.equal(await stat(path).then(() => true).catch(() => false), false) + const exists = await stat(path) + .then(() => true) + .catch((error: NodeJS.ErrnoException) => { + if (error?.code === 'ENOENT') return false + throw error + }) + assert.equal(exists, false) }📝 Committable suggestion
🤖 Prompt for AI Agents