Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions FORK.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ line in the PR that syncs it back.
| 5a598aae | api: respawn unresumable suspended sandboxes (pod backend) + post runtime-start failures once per thread | [#11](https://github.com/Tavus-Engineering/centaur/pull/11) |
| 5ac94c35 | slackbot: channel-thread replies require @-mention (undoes joined-thread auto-reply from #2); codex wrapper falls back to fresh thread on dead rollout | [#12](https://github.com/Tavus-Engineering/centaur/pull/12) |
| a9d47a12 | api: finalize codex turn.failed as failed_permanent + post failure notice to Slack; raise iron-proxy upstream header timeout to 300s (codex remote compaction); fold signoz/aws header allowlist into base.yaml | [#13](https://github.com/Tavus-Engineering/centaur/pull/13) |
| dcfd647c | slackbot: route in-thread Watch Agent mentions through DM and post results back only after approval | [#14](https://github.com/Tavus-Engineering/centaur/pull/14) |
108 changes: 108 additions & 0 deletions services/slackbot/src/centaur/handoff.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,4 +180,112 @@ describe('CentaurHandoff', () => {
globalThis.fetch = originalFetch
}
})

it('includes routed thread metadata while delivering to the DM thread', async () => {
const originalFetch = globalThis.fetch
let capturedInit: RequestInit | undefined
const fetchMock = mock(async (_input: string | URL | Request, init?: RequestInit) => {
capturedInit = init
return new Response(JSON.stringify({ ok: true }), { status: 200 })
})
globalThis.fetch = fetchMock as any
try {
const handoff = new CentaurHandoff(config)
const event: NormalizedSlackEvent = {
thread_key: 'slack:T123:D123:1778884000.000000',
message_id: 'slack:T123:C123:1778883001.000000',
team_id: 'T123',
user_id: 'U123',
channel_id: 'D123',
thread_ts: '1778884000.000000',
is_mention: true,
is_addressed: true,
parts: [{ type: 'text', text: 'routed request' }],
route: {
mode: 'dm_from_thread_mention',
source_team_id: 'T123',
source_channel_id: 'C123',
source_thread_ts: '1778883000.000000',
source_message_ts: '1778883001.000000',
source_request_url: 'https://slack.com/archives/C123/p1778883001000000',
source_thread_url: 'https://slack.com/archives/C123/p1778883000000000',
dm_channel_id: 'D123',
dm_thread_ts: '1778884000.000000'
},
slack: {
event_ts: '1778883001.000000',
message_ts: '1778883001.000000'
}
}

await handoff.emit(event)

const bodyText = capturedInit?.body
expect(typeof bodyText).toBe('string')
if (typeof bodyText !== 'string') throw new Error('expected JSON request body')
const body = JSON.parse(bodyText) as {
input: {
metadata: { route: NormalizedSlackEvent['route'] }
delivery: { channel: string; thread_ts: string }
}
}
expect(body.input.metadata.route).toEqual(event.route)
expect(body.input.delivery).toMatchObject({
channel: 'D123',
thread_ts: '1778884000.000000'
})
} finally {
globalThis.fetch = originalFetch
}
})

it('returns the newest completed execution result for routed DM publish approval', async () => {
const originalFetch = globalThis.fetch
const requestedUrls: string[] = []
const fetchMock = mock(async (input: string | URL | Request) => {
const url = input instanceof Request ? input.url : String(input)
requestedUrls.push(url)
if (url.includes('/agent/threads/')) {
return new Response(
JSON.stringify({
executions: [
{ execution_id: 'exe_running', status: 'running' },
{ execution_id: 'exe_empty', status: 'completed' },
{ execution_id: 'exe_success', status: 'completed' }
]
}),
{ status: 200 }
)
}
if (url.endsWith('/agent/executions/exe_empty')) {
return new Response(JSON.stringify({ execution_id: 'exe_empty', result_text: ' ' }), {
status: 200
})
}
if (url.endsWith('/agent/executions/exe_success')) {
return new Response(
JSON.stringify({ execution_id: 'exe_success', result_text: 'Final answer\n' }),
{ status: 200 }
)
}
return new Response(JSON.stringify({ error: 'unexpected_url' }), { status: 404 })
})
globalThis.fetch = fetchMock as any
try {
const handoff = new CentaurHandoff(config)

const result = await handoff.latestPostableExecutionResult(
'slack:T123:D123:1778884000.000000'
)

expect(result).toEqual({ execution_id: 'exe_success', result_text: 'Final answer' })
expect(requestedUrls).toEqual([
'http://centaur-api.test/agent/threads/slack%3AT123%3AD123%3A1778884000.000000/executions?limit=10',
'http://centaur-api.test/agent/executions/exe_empty',
'http://centaur-api.test/agent/executions/exe_success'
])
} finally {
globalThis.fetch = originalFetch
}
})
})
92 changes: 91 additions & 1 deletion services/slackbot/src/centaur/handoff.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ export type CentaurHandoffResult =
| { ok: true; status: number; body: unknown }
| { ok: false; status: number; body: unknown }

export type CentaurPostableExecutionResult = {
execution_id: string
result_text: string
}

export class CentaurHandoff {
readonly config: AppConfig

Expand Down Expand Up @@ -56,7 +61,8 @@ export class CentaurHandoff {
app_id: event.slack.app_id,
bot_user_id: event.slack.bot_user_id
},
is_mention: event.is_mention
is_mention: event.is_mention,
...(event.route ? { route: event.route } : {})
},
delivery: {
platform: 'slack',
Expand All @@ -78,6 +84,90 @@ export class CentaurHandoff {
}
)
}

async latestPostableExecutionResult(
threadKey: string
): Promise<CentaurPostableExecutionResult | null> {
return withSpan(
'centaur.slackbot.latest_postable_execution_result',
clientSpanOptions({
'centaur.thread_key': threadKey
}),
async span => {
const executionsUrl = new URL(
`/agent/threads/${encodeURIComponent(threadKey)}/executions`,
this.config.CENTAUR_API_URL
)
executionsUrl.searchParams.set('limit', '10')
const executionsResponse = await this.fetchJson(executionsUrl, threadKey)
spanAttributes(span, {
'http.response.status_code': executionsResponse.response.status
})
if (!executionsResponse.response.ok) return null

const executions = executionSummaries(executionsResponse.body)
for (const execution of executions) {
if (execution.status !== 'completed') continue
const detailUrl = new URL(
`/agent/executions/${encodeURIComponent(execution.execution_id)}`,
this.config.CENTAUR_API_URL
)
const detailResponse = await this.fetchJson(detailUrl, threadKey)
if (!detailResponse.response.ok) continue

const result = postableExecutionResult(detailResponse.body)
if (!result) continue
spanAttributes(span, {
'centaur.execution_id': result.execution_id,
'centaur.slackbot.latest_postable_result_found': true
})
return result
}

spanAttributes(span, {
'centaur.slackbot.latest_postable_result_found': false
})
return null
}
)
}

private async fetchJson(
url: URL,
threadKey: string
): Promise<{ response: Response; body: unknown }> {
const apiKey = centaurApiKey(this.config)
const response = await fetch(url, {
headers: {
'X-Centaur-Thread-Key': threadKey,
...injectTraceHeaders(),
...(apiKey ? { Authorization: `Bearer ${apiKey}` } : {})
}
})
return { response, body: await readResponseBody(response) }
}
}

function executionSummaries(value: unknown): Array<{ execution_id: string; status: string }> {
if (!value || typeof value !== 'object') return []
const executions = (value as { executions?: unknown }).executions
if (!Array.isArray(executions)) return []
return executions.flatMap(item => {
if (!item || typeof item !== 'object') return []
const executionId = (item as { execution_id?: unknown }).execution_id
const status = (item as { status?: unknown }).status
if (typeof executionId !== 'string' || typeof status !== 'string') return []
return [{ execution_id: executionId, status }]
})
}

function postableExecutionResult(value: unknown): CentaurPostableExecutionResult | null {
if (!value || typeof value !== 'object') return null
const executionId = (value as { execution_id?: unknown }).execution_id
const resultText = (value as { result_text?: unknown }).result_text
if (typeof executionId !== 'string' || typeof resultText !== 'string') return null
const trimmed = resultText.trim()
return trimmed ? { execution_id: executionId, result_text: trimmed } : null
}

async function readResponseBody(response: Response): Promise<unknown> {
Expand Down
139 changes: 139 additions & 0 deletions services/slackbot/src/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,145 @@ describe('Slack event HTTP dedupe', () => {
}
})

it('routes addressed channel-thread mentions into a DM workflow handoff', async () => {
process.env.SLACK_SIGNING_SECRET = 'test-signing-secret'
process.env.SLACK_BOT_TOKEN = 'xoxb-thread-route-test'
delete process.env.SLACKBOT_API_KEY
delete process.env.CENTAUR_API_KEY

const slackCalls: Array<{ path: string; body: Record<string, unknown> }> = []
const slackApi = Bun.serve({
port: 0,
async fetch(request) {
const url = new URL(request.url)
const body = await slackApiBody(request)
slackCalls.push({ path: url.pathname, body })
if (url.pathname === '/api/auth.test') {
return Response.json({ ok: true, user_id: 'UBOT', bot_id: 'BBOT' })
}
if (url.pathname === '/api/conversations.replies') {
return Response.json({
ok: true,
messages: [
{
type: 'message',
user: 'UORIG',
text: 'Original customer issue',
ts: '1778883000.000000'
}
]
})
}
if (url.pathname === '/api/reactions.add') {
return Response.json({ ok: true })
}
if (url.pathname === '/api/conversations.open') {
return Response.json({ ok: true, channel: { id: 'D123' } })
}
if (url.pathname === '/api/chat.postMessage') {
return Response.json({ ok: true, channel: 'D123', ts: '1778884000.000000' })
}
return Response.json({ ok: false, error: 'unexpected_slack_method' }, { status: 404 })
}
})
process.env.SLACK_API_URL = `http://127.0.0.1:${slackApi.port}/api/`

const centaurRequests: Array<{ path: string; body: Record<string, unknown> }> = []
const centaurApi = Bun.serve({
port: 0,
async fetch(request) {
const url = new URL(request.url)
const body = (await request.json()) as Record<string, unknown>
centaurRequests.push({ path: url.pathname, body })
if (url.pathname === '/workflows/runs') {
return Response.json({ ok: true, run_id: 'run_123' })
}
return Response.json({ ok: false, error: 'unexpected_centaur_path' }, { status: 404 })
}
})
process.env.CENTAUR_API_URL = `http://127.0.0.1:${centaurApi.port}`

try {
const { app } = await import(`./index.ts?thread_route=${Date.now()}`)
const body = JSON.stringify({
type: 'event_callback',
event_id: 'Ev-thread-route',
team_id: 'T123',
event: {
type: 'app_mention',
user: 'U123',
channel: 'C123',
thread_ts: '1778883000.000000',
ts: '1778883001.000000',
text: '<@UBOT> investigate this'
}
})
const waits: Promise<unknown>[] = []
const response = await app.request(
'/api/webhooks/slack',
signedJsonRequest(body, process.env.SLACK_SIGNING_SECRET),
{},
{
waitUntil: (promise: Promise<unknown>) => {
waits.push(promise)
}
} as any
)

expect(response.status).toBe(200)
expect(await response.json()).toEqual({ ok: true })
await Promise.allSettled(waits)

expect(slackCalls.find(call => call.path === '/api/reactions.add')?.body).toMatchObject({
channel: 'C123',
timestamp: '1778883001.000000',
name: 'incoming_envelope'
})
expect(slackCalls.find(call => call.path === '/api/conversations.open')?.body).toMatchObject({
users: 'U123'
})
const dmRoot = slackCalls.find(call => call.path === '/api/chat.postMessage')
expect(dmRoot?.body).toMatchObject({
channel: 'D123'
})
expect(dmRoot?.body.text).toContain('Original request:')
expect(dmRoot?.body.metadata).toMatchObject({
event_type: 'centaur_thread_dm_route',
event_payload: expect.objectContaining({
source_channel_id: 'C123',
source_thread_ts: '1778883000.000000',
source_message_ts: '1778883001.000000'
})
})

const workflow = centaurRequests.find(request => request.path === '/workflows/runs')?.body as
| { input?: Record<string, unknown> }
| undefined
expect(workflow?.input).toMatchObject({
thread_key: 'slack:T123:D123:1778884000.000000',
delivery: {
channel: 'D123',
thread_ts: '1778884000.000000'
},
metadata: {
route: expect.objectContaining({
mode: 'dm_from_thread_mention',
source_channel_id: 'C123',
source_thread_ts: '1778883000.000000',
dm_channel_id: 'D123',
dm_thread_ts: '1778884000.000000'
})
}
})
expect(JSON.stringify(workflow?.input)).toContain(
'Do you want me to post this answer if it succeeds?'
)
} finally {
await slackApi.stop()
await centaurApi.stop()
}
})

it('acks duplicate Slack envelopes without scheduling duplicate processing', async () => {
process.env.SLACK_SIGNING_SECRET = 'test-signing-secret'
process.env.SLACK_EVENT_DEDUP_TTL_MS = '600000'
Expand Down
Loading
Loading