diff --git a/src/main/__tests__/integration-event-bridge.test.ts b/src/main/__tests__/integration-event-bridge.test.ts index 38d36277..801e564b 100644 --- a/src/main/__tests__/integration-event-bridge.test.ts +++ b/src/main/__tests__/integration-event-bridge.test.ts @@ -1,4 +1,6 @@ import assert from 'node:assert/strict' +import { mkdir, mkdtemp, rm, writeFile } from 'node:fs/promises' +import { tmpdir } from 'node:os' import { join } from 'node:path' import { beforeEach, test } from 'node:test' @@ -873,6 +875,159 @@ test('slack raw-id event resolves context through mounted slug alias', async () assert.match(harness.sent[1].input.text, /Message:\nedited Slack message/u) }) +test('slack raw-id event falls back to matched local suffixed mount when remote read misses', async () => { + const tempRoot = await mkdtemp(join(tmpdir(), 'pear-slack-local-context-')) + const localRoot = join(tempRoot, 'workspace-id', 'slack', 'channels', 'C123ABC__proj-cloud') + const remotePath = '/slack/channels/C123ABC/messages/1780668000_000000/meta.json' + const localRemotePath = '/slack/channels/C123ABC__proj-cloud/messages/1780668000_000000/meta.json' + + try { + await mkdir(join(localRoot, 'messages', '1780668000_000000'), { recursive: true }) + await writeFile( + join(localRoot, 'messages', '1780668000_000000', 'meta.json'), + JSON.stringify({ provider: 'slack', text: 'local mounted Slack message' }) + ) + const harness = makeHarness(['alice'], { + failReadFile: true + }) + + await withMockedNow('2026-06-05T14:00:00.000Z', async () => { + await harness.bridge.reconcile('project-1', [ + integration({ + provider: 'slack', + integrationId: 'slack-1', + mountPaths: ['/slack/channels/C123ABC__proj-cloud'], + localMountPaths: [localRoot], + downloadHistoricalData: false, + scope: { notifyAgents: ['alice'] } + }) + ]) + }) + + await harness.emit({ + ...changeEvent( + remotePath, + 'slack', + { digest: 'revision:raw-copy' } + ), + expand: async () => ({ + level: 'full', + path: remotePath, + data: { + path: remotePath, + deleted: false + } + }) + } as ChangeEvent) + await waitForSent(harness, 1, 2_500) + + assert.match(harness.sent[0].input.text, /Message:\nlocal mounted Slack message/u) + assert.match(harness.sent[0].input.text, /Path: \.integrations\/slack\/channels\/C123ABC__proj-cloud\/messages\/1780668000_000000\/meta\.json/u) + assert.equal(harness.sent[0].input.data?.path, localRemotePath) + assert.deepEqual( + (harness.sent[0].input.data?.resource as { path?: string } | undefined)?.path, + localRemotePath + ) + assert.equal((harness.sent[0].input.data?.contextPreview as { path?: string } | undefined)?.path, localRemotePath) + assert.deepEqual(harness.readFileCalls.slice(0, 2), [ + { + workspaceId: 'workspace-id', + path: remotePath + }, + { + workspaceId: 'workspace-id', + path: localRemotePath + } + ]) + + await harness.emit(changeEvent( + localRemotePath, + 'slack', + { digest: 'revision:slug-copy' } + )) + await waitForDropped('project-1', 1, 2_500) + + assert.equal(harness.sent.length, 1) + assert.deepEqual(harness.readFileCalls.slice(8, 10), [ + { + workspaceId: 'workspace-id', + path: localRemotePath + }, + { + workspaceId: 'workspace-id', + path: remotePath + } + ]) + } finally { + await rm(tempRoot, { recursive: true, force: true }) + } +}) + +test('slack local context fallback rejects traversal outside matched mount root', async () => { + const tempRoot = await mkdtemp(join(tmpdir(), 'pear-slack-local-traversal-')) + const localRoot = join(tempRoot, 'workspace-id', 'slack', 'channels', 'C123ABC__proj-cloud') + const escapedRoot = join(tempRoot, 'workspace-id', 'slack', 'leaked') + const remotePath = '/slack/channels/C123ABC/messages/1780668000_000000/../../../../leaked/meta.json' + const localRemotePath = '/slack/channels/C123ABC__proj-cloud/messages/1780668000_000000/../../../../leaked/meta.json' + + try { + await mkdir(escapedRoot, { recursive: true }) + await writeFile( + join(escapedRoot, 'meta.json'), + JSON.stringify({ provider: 'slack', text: 'escaped Slack message' }) + ) + const harness = makeHarness(['alice'], { + failReadFile: true + }) + + await withMockedNow('2026-06-05T14:00:00.000Z', async () => { + await harness.bridge.reconcile('project-1', [ + integration({ + provider: 'slack', + integrationId: 'slack-1', + mountPaths: ['/slack/channels/C123ABC__proj-cloud'], + localMountPaths: [localRoot], + downloadHistoricalData: false, + scope: { notifyAgents: ['alice'] } + }) + ]) + }) + + await harness.emit({ + ...changeEvent( + remotePath, + 'slack', + { digest: 'revision:traversal-copy' } + ), + expand: async () => ({ + level: 'full', + path: remotePath, + data: { + path: remotePath, + deleted: false + } + }) + } as ChangeEvent) + await waitForSent(harness, 1, 2_500) + + assert.doesNotMatch(harness.sent[0].input.text, /escaped Slack message/u) + assert.match(harness.sent[0].input.text, /Message: unavailable/u) + assert.equal(harness.sent[0].input.data?.contextPreview, undefined) + assert.deepEqual(harness.readFileCalls.slice(0, 2), [ + { + workspaceId: 'workspace-id', + path: remotePath + }, + { + workspaceId: 'workspace-id', + path: localRemotePath + } + ]) + } finally { + await rm(tempRoot, { recursive: true, force: true }) + } +}) + test('slack unchanged-content replay re-drives after injected delivery is not confirmed', async () => { const options = { failInjected: true } const harness = makeHarness(['alice'], options) diff --git a/src/main/integration-event-bridge.ts b/src/main/integration-event-bridge.ts index 7f96e3c2..2e72759b 100644 --- a/src/main/integration-event-bridge.ts +++ b/src/main/integration-event-bridge.ts @@ -1,6 +1,6 @@ import { createHash } from 'node:crypto' import { existsSync, watch, type FSWatcher } from 'node:fs' -import { appendFile, mkdir, stat } from 'node:fs/promises' +import { appendFile, mkdir, readFile, stat } from 'node:fs/promises' import { homedir } from 'node:os' import { dirname, isAbsolute, join, relative, resolve, sep } from 'node:path' import { @@ -77,12 +77,18 @@ type SubscriptionSpec = { integrationId: string provider: string mountPaths: string[] + localMountRoots: LocalMountRoot[] eventPathGlobs: string[] watches: WatchRegistration[] targets: DeliveryTargets allowHistoricalReplay: boolean } +type LocalMountRoot = { + localRoot: string + remoteRoot: string +} + type ProjectSubscription = { subscriptions: Subscription[] signature: string @@ -534,7 +540,10 @@ function targetLabels(targets: DeliveryTargets): string[] { return [...targets.agents.map((agent) => `@${agent}`), ...targets.channels] } -function subscriptionSpecsFor(integrations: ConnectedIntegration[]): SubscriptionSpec[] { +function subscriptionSpecsFor( + integrations: ConnectedIntegration[], + localMountWorkspaceId?: string +): SubscriptionSpec[] { return integrations.map((integration) => { const mountPaths = canonicalMountPaths(integration) const eventPathGlobs = eventPathGlobsForIntegration(integration) @@ -542,6 +551,9 @@ function subscriptionSpecsFor(integrations: ConnectedIntegration[]): Subscriptio integrationId: integration.integrationId, provider: integration.provider, mountPaths, + localMountRoots: localMountWorkspaceId + ? concreteLocalMountRootsForIntegration(localMountWorkspaceId, integration, mountPaths) + : [], eventPathGlobs, watches: eventPathGlobs.map((glob) => ({ glob, @@ -950,6 +962,34 @@ function hasWatchableLocalIntegrationFor( ) } +function concreteLocalMountRootsForIntegration( + workspaceId: string, + integration: ConnectedIntegration, + mountPaths: string[] +): LocalMountRoot[] { + const roots = new Map() + const addRoot = (localRoot: string, remoteRoot: string): void => { + if (remoteRoot.includes('*')) return + if (!mountPaths.some((mountPath) => + pathIsInsideMount(remoteRoot, mountPath) || pathIsInsideMount(mountPath, remoteRoot) + )) { + return + } + const normalizedLocalRoot = resolve(localRoot) + roots.set(`${remoteRoot}:${normalizedLocalRoot}`, { + localRoot: normalizedLocalRoot, + remoteRoot + }) + } + + for (const localRoot of integration.localMountPaths || []) { + const remoteRoot = remoteRootForLocalMountPath(workspaceId, localRoot) + if (remoteRoot) addRoot(localRoot, remoteRoot) + } + + return Array.from(roots.values()) +} + function remoteRootForWatchGlob(glob: string): string | null { const trimmed = glob.trim() if (!trimmed.startsWith('/')) return null @@ -1025,6 +1065,11 @@ function localPathForRemotePathInsideRoot(localRoot: string, remoteRoot: string, return tail === '/' ? resolve(localRoot) : join(resolve(localRoot), ...pathSegments(tail)) } +function localPathIsInsideRoot(localRoot: string, localPath: string): boolean { + const relativePath = relative(resolve(localRoot), resolve(localPath)) + return relativePath === '' || (!!relativePath && !relativePath.startsWith('..') && !isAbsolute(relativePath)) +} + export function localWatchEventPathsForFilename( localRoot: string, remoteRoot: string, @@ -1746,6 +1791,56 @@ function slackContextReadCandidatePaths(path: string, specs: SubscriptionSpec[]) return dedupeStringsInOrder(candidates) } +function isSuffixedSlackChannelPath(path: string): boolean { + return /^\/slack\/channels\/[^/]+__[^/]+\//u.test(path) +} + +function resolvedSlackContextPath(path: string, specs: SubscriptionSpec[]): string { + const normalizedPath = path.startsWith('/') ? path : `/${path}` + if (!slackEventContextPath(normalizedPath)) return normalizedPath + const candidates = slackContextReadCandidatePaths(normalizedPath, specs) + return candidates.find((candidate) => + isSuffixedSlackChannelPath(candidate) && + specs.some((spec) => + spec.mountPaths.some((mountPath) => pathIsInsideMount(candidate, mountPath)) + ) + ) || candidates[0] || normalizedPath +} + +function contentTypeForLocalPath(localPath: string): string | undefined { + if (/\.json$/u.test(localPath)) return 'application/json' + if (/\.(?:txt|md|markdown)$/u.test(localPath)) return 'text/plain' + return undefined +} + +async function readLocalEventContextPreview( + remotePath: string, + specs: SubscriptionSpec[] +): Promise { + for (const spec of specs) { + for (const root of spec.localMountRoots) { + if (!pathIsInsideMount(remotePath, root.remoteRoot)) continue + const localPath = localPathForRemotePathInsideRoot(root.localRoot, root.remoteRoot, remotePath) + if (!localPathIsInsideRoot(root.localRoot, localPath)) continue + const stats = await stat(localPath).catch(() => null) + if (!stats || stats.isDirectory()) continue + try { + const buffer = await readFile(localPath) + logIntegrationEvent('event context local fallback read', { + integrationId: spec.integrationId, + remotePath, + localRoot: root.localRoot + }) + return eventContextPreviewFromBuffer(remotePath, buffer, contentTypeForLocalPath(localPath)) + } catch { + // Try the next matched concrete root/candidate. Missing or transient local + // reads should not mask the remote expand fallback. + } + } + } + return undefined +} + function slackScopeLabel(path: string): string | undefined { const segments = pathSegments(path) const channelIndex = segments.indexOf('channels') @@ -1761,14 +1856,15 @@ function slackScopeLabel(path: string): string | undefined { function formatSlackIntegrationEventMessage( event: ChangeEvent, - contextPreview?: EventContextPreview + contextPreview?: EventContextPreview, + resolvedPath?: string ): string | null { const resource = isRecord(event.resource) ? event.resource : {} const provider = eventSummaryValue(resource.provider) || eventProvider(event) const relayfilePath = eventSummaryValue(resource.path) if (provider !== 'slack' || !relayfilePath || !slackEventContextPath(relayfilePath)) return null - const contextPath = contextPreview?.path || relayfilePath + const contextPath = contextPreview?.path || resolvedPath || relayfilePath const projectPath = projectIntegrationPathForRelayfilePath(contextPath) const scopeLabel = slackScopeLabel(contextPath) const messageText = slackPreviewText(contextPreview) @@ -1796,16 +1892,18 @@ function formatSlackIntegrationEventMessage( function formatIntegrationEventMessage( event: ChangeEvent, - contextPreview?: EventContextPreview + contextPreview?: EventContextPreview, + resolvedPath?: string ): string { - const slackMessage = formatSlackIntegrationEventMessage(event, contextPreview) + const slackMessage = formatSlackIntegrationEventMessage(event, contextPreview, resolvedPath) if (slackMessage) return slackMessage const summary = isRecord(event.summary) ? event.summary : {} const resource = isRecord(event.resource) ? event.resource : {} const provider = eventSummaryValue(resource.provider) || 'integration' const relayfilePath = eventSummaryValue(resource.path) - const projectPath = relayfilePath ? projectIntegrationPathForRelayfilePath(relayfilePath) : undefined + const displayPath = resolvedPath || relayfilePath + const projectPath = displayPath ? projectIntegrationPathForRelayfilePath(displayPath) : undefined const resourceKind = eventSummaryValue(resource.kind) const resourceId = eventSummaryValue(resource.id) const title = eventSummaryValue(summary.title) @@ -1833,8 +1931,8 @@ function formatIntegrationEventMessage( if (actor) lines.push(`Actor: ${actor}`) if (fieldsChanged) lines.push(`Fields changed: ${fieldsChanged}`) if (labels) lines.push(`Labels: ${labels}`) - if (relayfilePath) { - lines.push(`Targeted context path: ${relayfilePath}`) + if (displayPath) { + lines.push(`Targeted context path: ${displayPath}`) } if (contextPreview) { if (contextPreview.kind === 'text') { @@ -2142,7 +2240,8 @@ export class IntegrationEventBridge { return } - const specs = subscriptionSpecsFor(subscribed) + const handle = await this.getWorkspaceHandle() + const specs = subscriptionSpecsFor(subscribed, handle.localMountWorkspaceId) const watches = dedupeStrings(specs.flatMap((spec) => spec.watches.map((watch) => watch.glob))).map((glob) => ({ glob, coalesceMs: 750 @@ -2152,7 +2251,6 @@ export class IntegrationEventBridge { return } - const handle = await this.getWorkspaceHandle() const signature = JSON.stringify({ workspaceId: handle.workspaceId, localMountWorkspaceId: handle.localMountWorkspaceId, @@ -2161,6 +2259,7 @@ export class IntegrationEventBridge { integrationId: spec.integrationId, provider: spec.provider, mountPaths: spec.mountPaths, + localMountRoots: spec.localMountRoots, eventPathGlobs: spec.eventPathGlobs, allowHistoricalReplay: spec.allowHistoricalReplay, targets: spec.targets @@ -2186,6 +2285,7 @@ export class IntegrationEventBridge { integrationId: spec.integrationId, provider: spec.provider, mountPaths: spec.mountPaths, + localMountRoots: spec.localMountRoots, eventPathGlobs: spec.eventPathGlobs, allowHistoricalReplay: spec.allowHistoricalReplay, targets: targetLabels(spec.targets) @@ -2340,6 +2440,10 @@ export class IntegrationEventBridge { } } } + for (const candidatePath of candidatePaths) { + const localPreview = await readLocalEventContextPreview(candidatePath, matchedSpecs) + if (localPreview) return localPreview + } } catch (error) { readFileError = error } @@ -2450,6 +2554,7 @@ export class IntegrationEventBridge { const eventMetadata = integrationEventMetadata(event) const contextPreview = await this.readEventContextPreview(projectId, event, matchedSpecs) + const resolvedPath = contextPreview?.path || resolvedSlackContextPath(event.resource.path, matchedSpecs) const usesConcreteAgentTargets = uniqueRecipients.every((recipient) => !recipient.startsWith('#')) const canTrackInjectedDelivery = usesConcreteAgentTargets && typeof bridge.sendMessageAndWaitForInjected === 'function' const shouldTrackDedupe = canTrackInjectedDelivery @@ -2515,10 +2620,13 @@ export class IntegrationEventBridge { dedupeClaimed = false } const contextPreviewData = contextPreview ? eventContextPreviewMetadata(contextPreview) : undefined + const resolvedResource = isRecord(event.resource) + ? { ...event.resource, path: resolvedPath } + : undefined logIntegrationEvent('injecting', { projectId, eventId: event.id, - path: event.resource.path, + path: resolvedPath, recipients: uniqueRecipients }) let deliveredCount = 0 @@ -2528,7 +2636,7 @@ export class IntegrationEventBridge { const input = { to: recipient, from: 'integration', - text: formatIntegrationEventMessage(event, contextPreview), + text: formatIntegrationEventMessage(event, contextPreview, resolvedPath), priority: 0, mode: 'steer', data: { @@ -2537,8 +2645,8 @@ export class IntegrationEventBridge { eventId: event.id, eventType: event.type, occurredAt: event.occurredAt, - resource: isRecord(event.resource) ? { ...event.resource } : undefined, - path: event.resource.path, + resource: resolvedResource, + path: resolvedPath, contextPreview: contextPreviewData, ...eventMetadata }