Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ Set by `sudo baudbot broker register` when using brokered Slack OAuth flow.
| `SLACK_BROKER_SIGNING_PUBLIC_KEY` | Broker Ed25519 public signing key (base64) |
| `SLACK_BROKER_POLL_INTERVAL_MS` | Inbox poll interval in milliseconds (default: `3000`) |
| `SLACK_BROKER_MAX_MESSAGES` | Max leased messages per poll request (default: `10`) |
| `SLACK_BROKER_WAIT_SECONDS` | Long-poll wait window for `/api/inbox/pull` (default: `20`, set `0` for immediate short-poll, max `25`) |
| `SLACK_BROKER_DEDUPE_TTL_MS` | Dedupe cache TTL in milliseconds (default: `1200000`) |

### Kernel (Cloud Browsers)
Expand Down Expand Up @@ -190,6 +191,7 @@ SLACK_BROKER_URL=https://broker.example.com
SLACK_BROKER_WORKSPACE_ID=T0123ABCD
SLACK_BROKER_POLL_INTERVAL_MS=3000
SLACK_BROKER_MAX_MESSAGES=10
SLACK_BROKER_WAIT_SECONDS=20
SLACK_BROKER_DEDUPE_TTL_MS=1200000

# Experimental features (required for email)
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ sudo baudbot broker register \
--registration-token <token-from-dashboard-callback>
```

Broker pull mode uses long-polling by default (`SLACK_BROKER_WAIT_SECONDS=20`, max `25`; set `0` for immediate short-poll behavior).

Need to rotate/update a key later?

```bash
Expand Down
2 changes: 2 additions & 0 deletions bin/config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ else
SLACK_BROKER_SIGNING_PUBLIC_KEY \
SLACK_BROKER_POLL_INTERVAL_MS \
SLACK_BROKER_MAX_MESSAGES \
SLACK_BROKER_WAIT_SECONDS \
SLACK_BROKER_DEDUPE_TTL_MS

prompt_secret "SLACK_BOT_TOKEN" \
Expand Down Expand Up @@ -613,6 +614,7 @@ ordered_keys=(
SLACK_BROKER_SIGNING_PUBLIC_KEY
SLACK_BROKER_POLL_INTERVAL_MS
SLACK_BROKER_MAX_MESSAGES
SLACK_BROKER_WAIT_SECONDS
SLACK_BROKER_DEDUPE_TTL_MS
BAUDBOT_AGENT_USER
BAUDBOT_AGENT_HOME
Expand Down
66 changes: 52 additions & 14 deletions slack-bridge/broker-bridge.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,32 @@ import {
} from "./security.mjs";
import {
canonicalizeEnvelope,
canonicalizeOutbound,
canonicalizeProtocolRequest,
canonicalizeSendRequest,
} from "./crypto.mjs";

const SOCKET_DIR = path.join(homedir(), ".pi", "session-control");
const AGENT_TIMEOUT_MS = 120_000;
const API_PORT = parseInt(process.env.BRIDGE_API_PORT || "7890", 10);
const POLL_INTERVAL_MS = parseInt(process.env.SLACK_BROKER_POLL_INTERVAL_MS || "3000", 10);
const MAX_MESSAGES = parseInt(process.env.SLACK_BROKER_MAX_MESSAGES || "10", 10);
const DEDUPE_TTL_MS = parseInt(process.env.SLACK_BROKER_DEDUPE_TTL_MS || String(20 * 60 * 1000), 10);

function clampInt(value, min, max, fallback) {
const parsed = Number.parseInt(String(value ?? ""), 10);
if (!Number.isFinite(parsed)) return fallback;
return Math.min(max, Math.max(min, parsed));
}

const API_PORT = clampInt(process.env.BRIDGE_API_PORT || "7890", 0, 65535, 7890);
const POLL_INTERVAL_MS = clampInt(process.env.SLACK_BROKER_POLL_INTERVAL_MS || "3000", 0, 60_000, 3000);
const MAX_MESSAGES = clampInt(process.env.SLACK_BROKER_MAX_MESSAGES || "10", 1, 100, 10);
const MAX_WAIT_SECONDS = 25;
const BROKER_WAIT_SECONDS = clampInt(process.env.SLACK_BROKER_WAIT_SECONDS || "20", 0, MAX_WAIT_SECONDS, 20);
const DEDUPE_TTL_MS = clampInt(
process.env.SLACK_BROKER_DEDUPE_TTL_MS || String(20 * 60 * 1000),
1_000,
7 * 24 * 60 * 60 * 1000,
20 * 60 * 1000,
);
const MAX_BACKOFF_MS = 30_000;
const INBOX_PROTOCOL_VERSION = "2026-02-1";
const BROKER_HEALTH_PATH = path.join(homedir(), ".pi", "agent", "broker-health.json");

function ts() {
Expand Down Expand Up @@ -351,12 +366,25 @@ function getThreadId(channel, threadTs) {
return id;
}

function signRequest(action, timestamp, payloadField) {
const canonical = canonicalizeOutbound(workspaceId, action, timestamp, payloadField);
function signProtocolRequest(action, timestamp, payload) {
const canonical = canonicalizeProtocolRequest(
workspaceId,
INBOX_PROTOCOL_VERSION,
action,
timestamp,
payload,
);
const sig = sodium.crypto_sign_detached(canonical, cryptoState.serverSignSecretKey);
return toBase64(sig);
}

function signPullRequest(timestamp, maxMessages, waitSeconds) {
return signProtocolRequest("inbox.pull", timestamp, {
max_messages: maxMessages,
wait_seconds: waitSeconds,
});
}

async function brokerFetch(pathname, body) {
const url = `${brokerBaseUrl}${pathname}`;
const response = await fetch(url, {
Expand Down Expand Up @@ -389,26 +417,30 @@ async function brokerFetch(pathname, body) {

async function pullInbox() {
const timestamp = Math.floor(Date.now() / 1000);
const signature = signRequest("inbox.pull", timestamp, String(MAX_MESSAGES));
const signature = signPullRequest(timestamp, MAX_MESSAGES, BROKER_WAIT_SECONDS);

const payload = await brokerFetch("/api/inbox/pull", {
const body = {
workspace_id: workspaceId,
protocol_version: INBOX_PROTOCOL_VERSION,
max_messages: MAX_MESSAGES,
wait_seconds: BROKER_WAIT_SECONDS,
timestamp,
signature,
});
};

const payload = await brokerFetch("/api/inbox/pull", body);

return Array.isArray(payload.messages) ? payload.messages : [];
}

async function ackInbox(messageIds) {
if (messageIds.length === 0) return;
const timestamp = Math.floor(Date.now() / 1000);
const joined = messageIds.join(",");
const signature = signRequest("inbox.ack", timestamp, joined);
const signature = signProtocolRequest("inbox.ack", timestamp, { message_ids: messageIds });

await brokerFetch("/api/inbox/ack", {
workspace_id: workspaceId,
protocol_version: INBOX_PROTOCOL_VERSION,
message_ids: messageIds,
timestamp,
signature,
Expand Down Expand Up @@ -918,7 +950,9 @@ async function startPollLoop() {
}

backoffMs = POLL_INTERVAL_MS;
await sleep(POLL_INTERVAL_MS);
if (BROKER_WAIT_SECONDS <= 0) {
await sleep(POLL_INTERVAL_MS);
}
} catch (err) {
if (!pollSucceeded) {
markHealth("poll", false, err);
Expand Down Expand Up @@ -960,7 +994,11 @@ async function startPollLoop() {
logInfo(` outbound mode: ${outboundMode} ${outboundMode === "direct" ? "(using SLACK_BOT_TOKEN)" : "(via broker)"}`);
logInfo(` broker: ${brokerBaseUrl}`);
logInfo(` workspace: ${workspaceId}`);
logInfo(` poll interval: ${POLL_INTERVAL_MS}ms, max messages: ${MAX_MESSAGES}`);
logInfo(` inbox protocol: ${INBOX_PROTOCOL_VERSION}`);
logInfo(
` poll mode: ${BROKER_WAIT_SECONDS > 0 ? `long-poll (${BROKER_WAIT_SECONDS}s)` : "short-poll"}, ` +
`interval: ${POLL_INTERVAL_MS}ms, max messages: ${MAX_MESSAGES}`,
);
logInfo(` allowed users: ${ALLOWED_USERS.length || "all"}`);
logInfo(` pi socket: ${socketPath || "(not found — will retry on message)"}`);
await startPollLoop();
Expand Down
18 changes: 18 additions & 0 deletions slack-bridge/crypto.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,24 @@ export function canonicalizeOutbound(workspace, action, timestamp, encryptedBody
return utf8Bytes(`${workspace}|${action}|${timestamp}|${encryptedBody}`);
}

/**
* Construct canonical bytes for protocol-versioned inbox pull/ack signing.
*
* Uses deterministic JSON serialization (sorted keys) to match broker
* json-stable-stringify canonicalization.
*/
export function canonicalizeProtocolRequest(workspace, protocolVersion, action, timestamp, payload) {
return utf8Bytes(
stableStringify({
workspace_id: workspace,
protocol_version: protocolVersion,
action,
timestamp,
payload,
}),
);
}

/**
* Construct canonical bytes for /api/send request signing.
*
Expand Down
25 changes: 25 additions & 0 deletions slack-bridge/crypto.test.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
stableStringify,
canonicalizeEnvelope,
canonicalizeOutbound,
canonicalizeProtocolRequest,
canonicalizeSendRequest,
} from "./crypto.mjs";

Expand Down Expand Up @@ -151,6 +152,30 @@ describe("canonicalizeOutbound", () => {
});
});

// ── canonicalizeProtocolRequest ─────────────────────────────────────────────

describe("canonicalizeProtocolRequest", () => {
it("produces stable JSON payload for protocol-versioned inbox.pull", () => {
const result = decode(
canonicalizeProtocolRequest("T123", "2026-02-1", "inbox.pull", 1700000000, {
max_messages: 10,
wait_seconds: 20,
}),
);
assert.equal(
result,
'{"action":"inbox.pull","payload":{"max_messages":10,"wait_seconds":20},"protocol_version":"2026-02-1","timestamp":1700000000,"workspace_id":"T123"}',
);
});

it("returns Uint8Array", () => {
const result = canonicalizeProtocolRequest("T123", "2026-02-1", "inbox.ack", 1700000000, {
message_ids: ["m1", "m2"],
});
assert.ok(result instanceof Uint8Array);
});
});

// ── canonicalizeSendRequest ─────────────────────────────────────────────────

describe("canonicalizeSendRequest", () => {
Expand Down
Loading