From dcfd647c472e4fcabc7c401336406f545bcdab82 Mon Sep 17 00:00:00 2001 From: Samuel Clay Date: Sun, 14 Jun 2026 09:09:52 -0400 Subject: [PATCH 1/2] fix(slackbot): route thread mentions through DM --- services/slackbot/src/centaur/handoff.test.ts | 108 +++++++ services/slackbot/src/centaur/handoff.ts | 92 +++++- services/slackbot/src/index.test.ts | 139 ++++++++ services/slackbot/src/index.ts | 31 +- .../slackbot/src/slack/thread-routing.test.ts | 196 +++++++++++ services/slackbot/src/slack/thread-routing.ts | 303 ++++++++++++++++++ services/slackbot/src/slack/types.ts | 11 + 7 files changed, 878 insertions(+), 2 deletions(-) create mode 100644 services/slackbot/src/slack/thread-routing.test.ts create mode 100644 services/slackbot/src/slack/thread-routing.ts diff --git a/services/slackbot/src/centaur/handoff.test.ts b/services/slackbot/src/centaur/handoff.test.ts index 88b82b72d..a42393a7f 100644 --- a/services/slackbot/src/centaur/handoff.test.ts +++ b/services/slackbot/src/centaur/handoff.test.ts @@ -180,4 +180,112 @@ describe('CentaurHandoff', () => { globalThis.fetch = originalFetch } }) + + it('includes routed thread metadata while delivering to the DM thread', async () => { + const originalFetch = globalThis.fetch + let capturedInit: RequestInit | undefined + const fetchMock = mock(async (_input: string | URL | Request, init?: RequestInit) => { + capturedInit = init + return new Response(JSON.stringify({ ok: true }), { status: 200 }) + }) + globalThis.fetch = fetchMock as any + try { + const handoff = new CentaurHandoff(config) + const event: NormalizedSlackEvent = { + thread_key: 'slack:T123:D123:1778884000.000000', + message_id: 'slack:T123:C123:1778883001.000000', + team_id: 'T123', + user_id: 'U123', + channel_id: 'D123', + thread_ts: '1778884000.000000', + is_mention: true, + is_addressed: true, + parts: [{ type: 'text', text: 'routed request' }], + route: { + mode: 'dm_from_thread_mention', + source_team_id: 'T123', + source_channel_id: 'C123', + source_thread_ts: '1778883000.000000', + source_message_ts: '1778883001.000000', + source_request_url: 'https://slack.com/archives/C123/p1778883001000000', + source_thread_url: 'https://slack.com/archives/C123/p1778883000000000', + dm_channel_id: 'D123', + dm_thread_ts: '1778884000.000000' + }, + slack: { + event_ts: '1778883001.000000', + message_ts: '1778883001.000000' + } + } + + await handoff.emit(event) + + const bodyText = capturedInit?.body + expect(typeof bodyText).toBe('string') + if (typeof bodyText !== 'string') throw new Error('expected JSON request body') + const body = JSON.parse(bodyText) as { + input: { + metadata: { route: NormalizedSlackEvent['route'] } + delivery: { channel: string; thread_ts: string } + } + } + expect(body.input.metadata.route).toEqual(event.route) + expect(body.input.delivery).toMatchObject({ + channel: 'D123', + thread_ts: '1778884000.000000' + }) + } finally { + globalThis.fetch = originalFetch + } + }) + + it('returns the newest completed execution result for routed DM publish approval', async () => { + const originalFetch = globalThis.fetch + const requestedUrls: string[] = [] + const fetchMock = mock(async (input: string | URL | Request) => { + const url = input instanceof Request ? input.url : String(input) + requestedUrls.push(url) + if (url.includes('/agent/threads/')) { + return new Response( + JSON.stringify({ + executions: [ + { execution_id: 'exe_running', status: 'running' }, + { execution_id: 'exe_empty', status: 'completed' }, + { execution_id: 'exe_success', status: 'completed' } + ] + }), + { status: 200 } + ) + } + if (url.endsWith('/agent/executions/exe_empty')) { + return new Response(JSON.stringify({ execution_id: 'exe_empty', result_text: ' ' }), { + status: 200 + }) + } + if (url.endsWith('/agent/executions/exe_success')) { + return new Response( + JSON.stringify({ execution_id: 'exe_success', result_text: 'Final answer\n' }), + { status: 200 } + ) + } + return new Response(JSON.stringify({ error: 'unexpected_url' }), { status: 404 }) + }) + globalThis.fetch = fetchMock as any + try { + const handoff = new CentaurHandoff(config) + + const result = await handoff.latestPostableExecutionResult( + 'slack:T123:D123:1778884000.000000' + ) + + expect(result).toEqual({ execution_id: 'exe_success', result_text: 'Final answer' }) + expect(requestedUrls).toEqual([ + 'http://centaur-api.test/agent/threads/slack%3AT123%3AD123%3A1778884000.000000/executions?limit=10', + 'http://centaur-api.test/agent/executions/exe_empty', + 'http://centaur-api.test/agent/executions/exe_success' + ]) + } finally { + globalThis.fetch = originalFetch + } + }) }) diff --git a/services/slackbot/src/centaur/handoff.ts b/services/slackbot/src/centaur/handoff.ts index e4a71a9a5..442e9100c 100644 --- a/services/slackbot/src/centaur/handoff.ts +++ b/services/slackbot/src/centaur/handoff.ts @@ -6,6 +6,11 @@ export type CentaurHandoffResult = | { ok: true; status: number; body: unknown } | { ok: false; status: number; body: unknown } +export type CentaurPostableExecutionResult = { + execution_id: string + result_text: string +} + export class CentaurHandoff { readonly config: AppConfig @@ -56,7 +61,8 @@ export class CentaurHandoff { app_id: event.slack.app_id, bot_user_id: event.slack.bot_user_id }, - is_mention: event.is_mention + is_mention: event.is_mention, + ...(event.route ? { route: event.route } : {}) }, delivery: { platform: 'slack', @@ -78,6 +84,90 @@ export class CentaurHandoff { } ) } + + async latestPostableExecutionResult( + threadKey: string + ): Promise { + return withSpan( + 'centaur.slackbot.latest_postable_execution_result', + clientSpanOptions({ + 'centaur.thread_key': threadKey + }), + async span => { + const executionsUrl = new URL( + `/agent/threads/${encodeURIComponent(threadKey)}/executions`, + this.config.CENTAUR_API_URL + ) + executionsUrl.searchParams.set('limit', '10') + const executionsResponse = await this.fetchJson(executionsUrl, threadKey) + spanAttributes(span, { + 'http.response.status_code': executionsResponse.response.status + }) + if (!executionsResponse.response.ok) return null + + const executions = executionSummaries(executionsResponse.body) + for (const execution of executions) { + if (execution.status !== 'completed') continue + const detailUrl = new URL( + `/agent/executions/${encodeURIComponent(execution.execution_id)}`, + this.config.CENTAUR_API_URL + ) + const detailResponse = await this.fetchJson(detailUrl, threadKey) + if (!detailResponse.response.ok) continue + + const result = postableExecutionResult(detailResponse.body) + if (!result) continue + spanAttributes(span, { + 'centaur.execution_id': result.execution_id, + 'centaur.slackbot.latest_postable_result_found': true + }) + return result + } + + spanAttributes(span, { + 'centaur.slackbot.latest_postable_result_found': false + }) + return null + } + ) + } + + private async fetchJson( + url: URL, + threadKey: string + ): Promise<{ response: Response; body: unknown }> { + const apiKey = centaurApiKey(this.config) + const response = await fetch(url, { + headers: { + 'X-Centaur-Thread-Key': threadKey, + ...injectTraceHeaders(), + ...(apiKey ? { Authorization: `Bearer ${apiKey}` } : {}) + } + }) + return { response, body: await readResponseBody(response) } + } +} + +function executionSummaries(value: unknown): Array<{ execution_id: string; status: string }> { + if (!value || typeof value !== 'object') return [] + const executions = (value as { executions?: unknown }).executions + if (!Array.isArray(executions)) return [] + return executions.flatMap(item => { + if (!item || typeof item !== 'object') return [] + const executionId = (item as { execution_id?: unknown }).execution_id + const status = (item as { status?: unknown }).status + if (typeof executionId !== 'string' || typeof status !== 'string') return [] + return [{ execution_id: executionId, status }] + }) +} + +function postableExecutionResult(value: unknown): CentaurPostableExecutionResult | null { + if (!value || typeof value !== 'object') return null + const executionId = (value as { execution_id?: unknown }).execution_id + const resultText = (value as { result_text?: unknown }).result_text + if (typeof executionId !== 'string' || typeof resultText !== 'string') return null + const trimmed = resultText.trim() + return trimmed ? { execution_id: executionId, result_text: trimmed } : null } async function readResponseBody(response: Response): Promise { diff --git a/services/slackbot/src/index.test.ts b/services/slackbot/src/index.test.ts index 79cbc926e..acb20e7d1 100644 --- a/services/slackbot/src/index.test.ts +++ b/services/slackbot/src/index.test.ts @@ -137,6 +137,145 @@ describe('Slack event HTTP dedupe', () => { } }) + it('routes addressed channel-thread mentions into a DM workflow handoff', async () => { + process.env.SLACK_SIGNING_SECRET = 'test-signing-secret' + process.env.SLACK_BOT_TOKEN = 'xoxb-thread-route-test' + delete process.env.SLACKBOT_API_KEY + delete process.env.CENTAUR_API_KEY + + const slackCalls: Array<{ path: string; body: Record }> = [] + const slackApi = Bun.serve({ + port: 0, + async fetch(request) { + const url = new URL(request.url) + const body = await slackApiBody(request) + slackCalls.push({ path: url.pathname, body }) + if (url.pathname === '/api/auth.test') { + return Response.json({ ok: true, user_id: 'UBOT', bot_id: 'BBOT' }) + } + if (url.pathname === '/api/conversations.replies') { + return Response.json({ + ok: true, + messages: [ + { + type: 'message', + user: 'UORIG', + text: 'Original customer issue', + ts: '1778883000.000000' + } + ] + }) + } + if (url.pathname === '/api/reactions.add') { + return Response.json({ ok: true }) + } + if (url.pathname === '/api/conversations.open') { + return Response.json({ ok: true, channel: { id: 'D123' } }) + } + if (url.pathname === '/api/chat.postMessage') { + return Response.json({ ok: true, channel: 'D123', ts: '1778884000.000000' }) + } + return Response.json({ ok: false, error: 'unexpected_slack_method' }, { status: 404 }) + } + }) + process.env.SLACK_API_URL = `http://127.0.0.1:${slackApi.port}/api/` + + const centaurRequests: Array<{ path: string; body: Record }> = [] + const centaurApi = Bun.serve({ + port: 0, + async fetch(request) { + const url = new URL(request.url) + const body = (await request.json()) as Record + centaurRequests.push({ path: url.pathname, body }) + if (url.pathname === '/workflows/runs') { + return Response.json({ ok: true, run_id: 'run_123' }) + } + return Response.json({ ok: false, error: 'unexpected_centaur_path' }, { status: 404 }) + } + }) + process.env.CENTAUR_API_URL = `http://127.0.0.1:${centaurApi.port}` + + try { + const { app } = await import(`./index.ts?thread_route=${Date.now()}`) + const body = JSON.stringify({ + type: 'event_callback', + event_id: 'Ev-thread-route', + team_id: 'T123', + event: { + type: 'app_mention', + user: 'U123', + channel: 'C123', + thread_ts: '1778883000.000000', + ts: '1778883001.000000', + text: '<@UBOT> investigate this' + } + }) + const waits: Promise[] = [] + const response = await app.request( + '/api/webhooks/slack', + signedJsonRequest(body, process.env.SLACK_SIGNING_SECRET), + {}, + { + waitUntil: (promise: Promise) => { + waits.push(promise) + } + } as any + ) + + expect(response.status).toBe(200) + expect(await response.json()).toEqual({ ok: true }) + await Promise.allSettled(waits) + + expect(slackCalls.find(call => call.path === '/api/reactions.add')?.body).toMatchObject({ + channel: 'C123', + timestamp: '1778883001.000000', + name: 'incoming_envelope' + }) + expect(slackCalls.find(call => call.path === '/api/conversations.open')?.body).toMatchObject({ + users: 'U123' + }) + const dmRoot = slackCalls.find(call => call.path === '/api/chat.postMessage') + expect(dmRoot?.body).toMatchObject({ + channel: 'D123' + }) + expect(dmRoot?.body.text).toContain('Original request:') + expect(dmRoot?.body.metadata).toMatchObject({ + event_type: 'centaur_thread_dm_route', + event_payload: expect.objectContaining({ + source_channel_id: 'C123', + source_thread_ts: '1778883000.000000', + source_message_ts: '1778883001.000000' + }) + }) + + const workflow = centaurRequests.find(request => request.path === '/workflows/runs')?.body as + | { input?: Record } + | undefined + expect(workflow?.input).toMatchObject({ + thread_key: 'slack:T123:D123:1778884000.000000', + delivery: { + channel: 'D123', + thread_ts: '1778884000.000000' + }, + metadata: { + route: expect.objectContaining({ + mode: 'dm_from_thread_mention', + source_channel_id: 'C123', + source_thread_ts: '1778883000.000000', + dm_channel_id: 'D123', + dm_thread_ts: '1778884000.000000' + }) + } + }) + expect(JSON.stringify(workflow?.input)).toContain( + 'Do you want me to post this answer if it succeeds?' + ) + } finally { + await slackApi.stop() + await centaurApi.stop() + } + }) + it('acks duplicate Slack envelopes without scheduling duplicate processing', async () => { process.env.SLACK_SIGNING_SECRET = 'test-signing-secret' process.env.SLACK_EVENT_DEDUP_TTL_MS = '600000' diff --git a/services/slackbot/src/index.ts b/services/slackbot/src/index.ts index 31fdc3989..54379dd1f 100644 --- a/services/slackbot/src/index.ts +++ b/services/slackbot/src/index.ts @@ -26,6 +26,10 @@ import { EnvSlackInstallationStore, SlackClientResolver } from './slack/installa import { normalizeSlackEnvelope } from './slack/normalize' import { markdownToStreamChunks } from './slack/render' import { verifySlackSignature } from './slack/signature' +import { + maybePublishApprovedDmResultToThread, + routeThreadMentionToDm +} from './slack/thread-routing' import { shouldAckWithReaction } from './slack/trivial-ack' import type { NormalizedSlackEvent, SlackEnvelope } from './slack/types' import type { AnyBlock, AnyChunk } from '@slack/types' @@ -571,7 +575,7 @@ async function processSlackEvent(envelope: SlackEnvelope): Promise { teamId: envelope.team_id, enterpriseId: envelope.enterprise_id }) - const normalized = await normalizeSlackEnvelope({ + let normalized = await normalizeSlackEnvelope({ envelope, botUserId: installation.botUserId, botId: installation.botId, @@ -602,6 +606,31 @@ async function processSlackEvent(envelope: SlackEnvelope): Promise { return } + const publishedDmResult = await maybePublishApprovedDmResultToThread({ + client, + event: normalized, + latestPostableResult: threadKey => handoff.latestPostableExecutionResult(threadKey) + }) + if (publishedDmResult) { + spanAttributes(span, { + 'centaur.slackbot.event_action': 'publish_dm_result_to_thread' + }) + return + } + + const routed = await routeThreadMentionToDm(client, normalized) + if (routed !== normalized) { + normalized = routed + spanAttributes(span, { + 'centaur.thread_key': normalized.thread_key, + 'slack.channel_id': normalized.channel_id, + 'slack.thread_ts': normalized.thread_ts, + 'centaur.slackbot.route_mode': normalized.route?.mode, + 'centaur.slackbot.route_source_channel_id': normalized.route?.source_channel_id, + 'centaur.slackbot.route_source_thread_ts': normalized.route?.source_thread_ts + }) + } + if (shouldAckWithReaction(normalized)) { spanAttributes(span, { 'centaur.slackbot.event_action': 'ack_reaction' diff --git a/services/slackbot/src/slack/thread-routing.test.ts b/services/slackbot/src/slack/thread-routing.test.ts new file mode 100644 index 000000000..d87fb0b66 --- /dev/null +++ b/services/slackbot/src/slack/thread-routing.test.ts @@ -0,0 +1,196 @@ +import { describe, expect, it, mock } from 'bun:test' +import type { WebClient } from '@slack/web-api' +import { + maybePublishApprovedDmResultToThread, + routeThreadMentionToDm, + shouldRouteThreadMentionToDm +} from './thread-routing' +import type { NormalizedSlackEvent } from './types' + +describe('thread mention DM routing', () => { + it('moves channel thread mentions into a DM thread', async () => { + const reactionsAdd = mock(async () => ({ ok: true })) + const conversationsOpen = mock(async () => ({ ok: true, channel: { id: 'D123' } })) + const chatPostMessage = mock(async () => ({ + ok: true, + channel: 'D123', + ts: '1778884000.000000' + })) + const client = slackClient({ + reactions: { add: reactionsAdd }, + conversations: { open: conversationsOpen }, + chat: { postMessage: chatPostMessage } + }) + const event = channelThreadMentionEvent() + + const routed = await routeThreadMentionToDm(client, event) + + expect(routed).not.toBe(event) + expect(routed.channel_id).toBe('D123') + expect(routed.thread_ts).toBe('1778884000.000000') + expect(routed.thread_key).toBe('slack:T123:D123:1778884000.000000') + expect(routed.route).toMatchObject({ + mode: 'dm_from_thread_mention', + source_channel_id: 'C123', + source_thread_ts: '1778883000.000000', + source_message_ts: '1778883001.000000', + dm_channel_id: 'D123', + dm_thread_ts: '1778884000.000000' + }) + expect(routed.parts[0]).toMatchObject({ + type: 'text', + text: expect.stringContaining('Do you want me to post this answer if it succeeds?') + }) + expect(reactionsAdd).toHaveBeenCalledWith({ + channel: 'C123', + timestamp: '1778883001.000000', + name: 'incoming_envelope' + }) + expect(conversationsOpen).toHaveBeenCalledWith({ users: 'U123' }) + expect(chatPostMessage).toHaveBeenCalledWith( + expect.objectContaining({ + channel: 'D123', + metadata: expect.objectContaining({ + event_type: 'centaur_thread_dm_route', + event_payload: expect.objectContaining({ + source_channel_id: 'C123', + source_thread_ts: '1778883000.000000', + source_message_ts: '1778883001.000000' + }) + }) + }) + ) + }) + + it('keeps explicit inline requests in the original thread', async () => { + const client = slackClient({ + reactions: { add: mock(async () => ({ ok: true })) }, + conversations: { open: mock(async () => ({ ok: true, channel: { id: 'D123' } })) }, + chat: { postMessage: mock(async () => ({ ok: true, channel: 'D123', ts: '1.0' })) } + }) + const event = channelThreadMentionEvent({ + parts: [{ type: 'text', text: '<@UBOT> investigate this and post it inline' }] + }) + + expect(shouldRouteThreadMentionToDm(event)).toBe(false) + expect(routeThreadMentionToDm(client, event)).resolves.toBe(event) + expect(client.reactions.add).not.toHaveBeenCalled() + expect(client.conversations.open).not.toHaveBeenCalled() + expect(client.chat.postMessage).not.toHaveBeenCalled() + }) + + it('posts the latest successful DM result back when the user approves', async () => { + const chatPostMessage = mock(async () => ({ + ok: true, + channel: 'C123', + ts: '1778885000.000000' + })) + const client = slackClient({ + conversations: { + replies: mock(async () => ({ + ok: true, + messages: [ + { + ts: '1778884000.000000', + metadata: { + event_type: 'centaur_thread_dm_route', + event_payload: { + source_team_id: 'T123', + source_channel_id: 'C123', + source_thread_ts: '1778883000.000000', + source_message_ts: '1778883001.000000', + source_request_url: 'https://slack.com/archives/C123/p1778883001000000', + source_thread_url: 'https://slack.com/archives/C123/p1778883000000000' + } + } + } + ] + })) + }, + chat: { postMessage: chatPostMessage } + }) + const latestPostableResult = mock(async () => ({ + execution_id: 'exe_123', + result_text: 'Final answer' + })) + const event = dmApprovalEvent() + + expect( + maybePublishApprovedDmResultToThread({ client, event, latestPostableResult }) + ).resolves.toBe(true) + + expect(latestPostableResult).toHaveBeenCalledWith('slack:T123:D123:1778884000.000000') + expect(chatPostMessage).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ + channel: 'C123', + thread_ts: '1778883000.000000', + text: expect.stringContaining('Final answer'), + metadata: expect.objectContaining({ + event_type: 'centaur_thread_result_posted', + event_payload: expect.objectContaining({ + source_dm_channel_id: 'D123', + source_dm_thread_ts: '1778884000.000000', + source_execution_id: 'exe_123', + approved_by_user_id: 'U123' + }) + }) + }) + ) + expect(chatPostMessage).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ + channel: 'D123', + thread_ts: '1778884000.000000', + text: expect.stringContaining('Posted to the original thread:') + }) + ) + }) +}) + +function channelThreadMentionEvent( + overrides: Partial = {} +): NormalizedSlackEvent { + return { + thread_key: 'slack:T123:C123:1778883000.000000', + message_id: 'slack:T123:C123:1778883001.000000', + team_id: 'T123', + user_id: 'U123', + channel_id: 'C123', + thread_ts: '1778883000.000000', + is_mention: true, + is_addressed: true, + parts: [{ type: 'text', text: '<@UBOT> investigate this' }], + slack: { + event_id: 'Ev123', + event_ts: '1778883001.000000', + message_ts: '1778883001.000000', + bot_user_id: 'UBOT' + }, + ...overrides + } +} + +function dmApprovalEvent(): NormalizedSlackEvent { + return { + thread_key: 'slack:T123:D123:1778884000.000000', + message_id: 'slack:T123:D123:1778884001.000000', + team_id: 'T123', + user_id: 'U123', + channel_id: 'D123', + thread_ts: '1778884000.000000', + is_mention: false, + is_addressed: true, + parts: [{ type: 'text', text: 'yes please post it' }], + slack: { + event_id: 'Ev456', + event_ts: '1778884001.000000', + message_ts: '1778884001.000000', + bot_user_id: 'UBOT' + } + } +} + +function slackClient(value: unknown): WebClient { + return value as WebClient +} diff --git a/services/slackbot/src/slack/thread-routing.ts b/services/slackbot/src/slack/thread-routing.ts new file mode 100644 index 000000000..e9a79f066 --- /dev/null +++ b/services/slackbot/src/slack/thread-routing.ts @@ -0,0 +1,303 @@ +import type { WebClient } from '@slack/web-api' +import { logWarn } from '../logging' +import type { NormalizedSlackEvent, NormalizedTextPart } from './types' + +const THREAD_MENTION_DM_REACTION = 'incoming_envelope' +const ROUTE_METADATA_TYPE = 'centaur_thread_dm_route' +const RESULT_POSTED_METADATA_TYPE = 'centaur_thread_result_posted' + +type SlackMessageWithMetadata = { + metadata?: { + event_type?: string + event_payload?: unknown + } +} + +type RoutedThreadPayload = { + source_team_id: string + source_channel_id: string + source_thread_ts: string + source_message_ts: string + source_request_url: string + source_thread_url: string +} + +export type PostableExecutionResult = { + execution_id: string + result_text: string +} + +export function shouldRouteThreadMentionToDm(event: NormalizedSlackEvent): boolean { + if (!isChannelThreadReply(event)) return false + if (!event.is_mention || !event.is_addressed) return false + return !requestsInlineThreadReply(event) +} + +export async function routeThreadMentionToDm( + client: WebClient, + event: NormalizedSlackEvent +): Promise { + if (!shouldRouteThreadMentionToDm(event)) return event + + await addDmReaction(client, event) + + const dmChannelId = await openDmChannel(client, event.user_id) + const sourceRequestUrl = slackMessageUrl( + event.channel_id, + event.slack.message_ts, + event.thread_ts + ) + const sourceThreadUrl = slackMessageUrl(event.channel_id, event.thread_ts) + const routePayload: RoutedThreadPayload = { + source_team_id: event.team_id, + source_channel_id: event.channel_id, + source_thread_ts: event.thread_ts, + source_message_ts: event.slack.message_ts, + source_request_url: sourceRequestUrl, + source_thread_url: sourceThreadUrl + } + + const dmRoot = await client.chat.postMessage({ + channel: dmChannelId, + text: [ + 'I moved this Watch Agent request into DM to avoid adding noise to the original thread.', + '', + `Original request: ${sourceRequestUrl}`, + `Original thread: ${sourceThreadUrl}`, + '', + 'I will reply in this DM thread.' + ].join('\n'), + metadata: { + event_type: ROUTE_METADATA_TYPE, + event_payload: routePayload + } + } as Parameters[0]) + if (!dmRoot.ok || !dmRoot.ts) { + throw new Error(`Failed to open routed DM thread: ${dmRoot.error ?? 'missing_ts'}`) + } + + const dmThreadTs = dmRoot.ts + return { + ...event, + thread_key: `slack:${event.team_id}:${dmChannelId}:${dmThreadTs}`, + channel_id: dmChannelId, + thread_ts: dmThreadTs, + parts: [routingInstructionPart(routePayload), ...event.parts], + route: { + mode: 'dm_from_thread_mention', + ...routePayload, + dm_channel_id: dmChannelId, + dm_thread_ts: dmThreadTs + } + } +} + +export async function maybePublishApprovedDmResultToThread(opts: { + client: WebClient + event: NormalizedSlackEvent + latestPostableResult: (threadKey: string) => Promise +}): Promise { + if (!isDmThreadReply(opts.event)) return false + if (!isPublishApproval(eventText(opts.event))) return false + + const routePayload = await loadRoutePayloadFromDmRoot(opts.client, opts.event) + if (!routePayload) return false + + const result = await opts.latestPostableResult(opts.event.thread_key) + if (!result) { + await opts.client.chat.postMessage({ + channel: opts.event.channel_id, + thread_ts: opts.event.thread_ts, + text: 'I do not have a successful result to post yet.' + }) + return true + } + + const posted = await opts.client.chat.postMessage({ + channel: routePayload.source_channel_id, + thread_ts: routePayload.source_thread_ts, + text: [ + `Posted from a private Watch Agent follow-up for <@${opts.event.user_id}>.`, + '', + result.result_text + ].join('\n'), + metadata: { + event_type: RESULT_POSTED_METADATA_TYPE, + event_payload: { + source_dm_channel_id: opts.event.channel_id, + source_dm_thread_ts: opts.event.thread_ts, + source_execution_id: result.execution_id, + approved_by_user_id: opts.event.user_id + } + } + } as Parameters[0]) + if (!posted.ok) { + throw new Error(`Failed to post routed result: ${posted.error ?? 'unknown_error'}`) + } + + await opts.client.chat.postMessage({ + channel: opts.event.channel_id, + thread_ts: opts.event.thread_ts, + text: posted.ts + ? `Posted to the original thread: ${slackMessageUrl( + routePayload.source_channel_id, + posted.ts, + routePayload.source_thread_ts + )}` + : 'Posted to the original thread.' + }) + return true +} + +function isChannelThreadReply(event: NormalizedSlackEvent): boolean { + return !event.channel_id.startsWith('D') && event.slack.message_ts !== event.thread_ts +} + +function isDmThreadReply(event: NormalizedSlackEvent): boolean { + return event.channel_id.startsWith('D') && event.slack.message_ts !== event.thread_ts +} + +async function addDmReaction(client: WebClient, event: NormalizedSlackEvent): Promise { + try { + await client.reactions.add({ + channel: event.channel_id, + timestamp: event.slack.message_ts, + name: THREAD_MENTION_DM_REACTION + }) + } catch (error) { + logWarn('slack_thread_dm_reaction_failed', { + channel_id: event.channel_id, + thread_ts: event.thread_ts, + message_ts: event.slack.message_ts, + error: error instanceof Error ? error.message : String(error) + }) + } +} + +async function openDmChannel(client: WebClient, userId: string): Promise { + const response = await client.conversations.open({ users: userId }) + const channelId = response.channel?.id + if (!response.ok || !channelId) { + throw new Error(`Failed to open DM channel: ${response.error ?? 'missing_channel_id'}`) + } + return channelId +} + +async function loadRoutePayloadFromDmRoot( + client: WebClient, + event: NormalizedSlackEvent +): Promise { + const response = await client.conversations.replies({ + channel: event.channel_id, + ts: event.thread_ts, + limit: 10 + }) + if (!response.ok || !Array.isArray(response.messages)) return null + + for (const message of response.messages as SlackMessageWithMetadata[]) { + const metadata = message.metadata + if (metadata?.event_type !== ROUTE_METADATA_TYPE) continue + return parseRoutePayload(metadata.event_payload) + } + return null +} + +function parseRoutePayload(value: unknown): RoutedThreadPayload | null { + if (!value || typeof value !== 'object') return null + const payload = value as Record + const source_team_id = stringField(payload, 'source_team_id') + const source_channel_id = stringField(payload, 'source_channel_id') + const source_thread_ts = stringField(payload, 'source_thread_ts') + const source_message_ts = stringField(payload, 'source_message_ts') + const source_request_url = stringField(payload, 'source_request_url') + const source_thread_url = stringField(payload, 'source_thread_url') + if ( + !source_team_id || + !source_channel_id || + !source_thread_ts || + !source_message_ts || + !source_request_url || + !source_thread_url + ) { + return null + } + return { + source_team_id, + source_channel_id, + source_thread_ts, + source_message_ts, + source_request_url, + source_thread_url + } +} + +function stringField(payload: Record, key: string): string { + const value = payload[key] + return typeof value === 'string' ? value.trim() : '' +} + +function routingInstructionPart(payload: RoutedThreadPayload): NormalizedTextPart { + return { + type: 'text', + text: [ + 'Slack routing note: this Watch Agent request was moved from an existing Slack thread into this private DM thread to avoid spamming the original thread.', + `Original request: ${payload.source_request_url}`, + `Original top-level thread: ${payload.source_thread_url}`, + 'Use the backfilled original thread history as context, keep the answer in this DM, and end your final answer with: "Do you want me to post this answer if it succeeds?"' + ].join('\n') + } +} + +function requestsInlineThreadReply(event: NormalizedSlackEvent): boolean { + return INLINE_REPLY_RE.test(eventText(event)) +} + +function eventText(event: NormalizedSlackEvent): string { + return event.parts + .filter(part => part.type === 'text') + .map(part => part.text) + .join('\n') +} + +const INLINE_REPLY_RE = + /\b(?:post|reply|respond|answer)\s+(?:(?:just|only)\s+)?(?:the\s+)?(?:(?:answer|results?|response|reply|it|this)\s+)?(?:inline|here|in\s+(?:this|the)\s+thread)\b|\b(?:keep|leave)\s+(?:it|this|the\s+(?:answer|results?|response|reply))\s+(?:inline|here|in\s+(?:this|the)\s+thread)\b/i + +function isPublishApproval(text: string): boolean { + const normalized = text + .toLowerCase() + .replace(/[!?.,…]+/gu, ' ') + .replace(/\s+/gu, ' ') + .trim() + if (!normalized || normalized.length > 120) return false + if ( + new Set([ + 'yes', + 'yes please', + 'yep', + 'yup', + 'yeah', + 'sure', + 'sure thing', + 'ok', + 'okay', + 'sgtm', + 'please do', + 'go ahead', + 'do it' + ]).has(normalized) + ) { + return true + } + return ( + /\bpost\b/.test(normalized) && /\b(it|this|answer|results?|thread|there)\b/.test(normalized) + ) +} + +export function slackMessageUrl(channelId: string, messageTs: string, threadTs?: string): string { + const url = new URL(`https://slack.com/archives/${channelId}/p${messageTs.replace('.', '')}`) + if (threadTs && threadTs !== messageTs) { + url.searchParams.set('thread_ts', threadTs) + url.searchParams.set('cid', channelId) + } + return url.toString() +} diff --git a/services/slackbot/src/slack/types.ts b/services/slackbot/src/slack/types.ts index faba682e1..575523f3d 100644 --- a/services/slackbot/src/slack/types.ts +++ b/services/slackbot/src/slack/types.ts @@ -31,6 +31,17 @@ export type NormalizedSlackEvent = { is_mention: boolean is_addressed: boolean parts: NormalizedPart[] + route?: { + mode: 'dm_from_thread_mention' + source_team_id: string + source_channel_id: string + source_thread_ts: string + source_message_ts: string + source_request_url: string + source_thread_url: string + dm_channel_id: string + dm_thread_ts: string + } history_messages?: Array<{ message_id: string role?: 'user' | 'assistant' From 6b17e23ff4fa782abda9549dd7affb4e2b29f360 Mon Sep 17 00:00:00 2001 From: Samuel Clay Date: Sun, 14 Jun 2026 09:19:38 -0400 Subject: [PATCH 2/2] docs: track thread mention DM routing --- FORK.md | 1 + 1 file changed, 1 insertion(+) diff --git a/FORK.md b/FORK.md index 8cf34c867..921f68a86 100644 --- a/FORK.md +++ b/FORK.md @@ -21,3 +21,4 @@ line in the PR that syncs it back. | 5a598aae | api: respawn unresumable suspended sandboxes (pod backend) + post runtime-start failures once per thread | [#11](https://github.com/Tavus-Engineering/centaur/pull/11) | | 5ac94c35 | slackbot: channel-thread replies require @-mention (undoes joined-thread auto-reply from #2); codex wrapper falls back to fresh thread on dead rollout | [#12](https://github.com/Tavus-Engineering/centaur/pull/12) | | a9d47a12 | api: finalize codex turn.failed as failed_permanent + post failure notice to Slack; raise iron-proxy upstream header timeout to 300s (codex remote compaction); fold signoz/aws header allowlist into base.yaml | [#13](https://github.com/Tavus-Engineering/centaur/pull/13) | +| dcfd647c | slackbot: route in-thread Watch Agent mentions through DM and post results back only after approval | [#14](https://github.com/Tavus-Engineering/centaur/pull/14) |