Skip to content

Commit 2c88a83

Browse files
authored
bridge: add broker long-poll v2 pull signing (#123)
1 parent 55acd30 commit 2c88a83

7 files changed

Lines changed: 376 additions & 15 deletions

File tree

CONFIGURATION.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ Set by `sudo baudbot broker register` when using brokered Slack OAuth flow.
106106
| `SLACK_BROKER_SIGNING_PUBLIC_KEY` | Broker Ed25519 public signing key (base64) |
107107
| `SLACK_BROKER_POLL_INTERVAL_MS` | Inbox poll interval in milliseconds (default: `3000`) |
108108
| `SLACK_BROKER_MAX_MESSAGES` | Max leased messages per poll request (default: `10`) |
109+
| `SLACK_BROKER_WAIT_SECONDS` | Long-poll wait window for `/api/inbox/pull` (default: `20`, set `0` for immediate short-poll, max `25`) |
109110
| `SLACK_BROKER_DEDUPE_TTL_MS` | Dedupe cache TTL in milliseconds (default: `1200000`) |
110111

111112
### Kernel (Cloud Browsers)
@@ -190,6 +191,7 @@ SLACK_BROKER_URL=https://broker.example.com
190191
SLACK_BROKER_WORKSPACE_ID=T0123ABCD
191192
SLACK_BROKER_POLL_INTERVAL_MS=3000
192193
SLACK_BROKER_MAX_MESSAGES=10
194+
SLACK_BROKER_WAIT_SECONDS=20
193195
SLACK_BROKER_DEDUPE_TTL_MS=1200000
194196

195197
# Experimental features (required for email)

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,8 @@ sudo baudbot broker register \
9898
--registration-token <token-from-dashboard-callback>
9999
```
100100

101+
Broker pull mode uses long-polling by default (`SLACK_BROKER_WAIT_SECONDS=20`, max `25`; set `0` for immediate short-poll behavior).
102+
101103
Need to rotate/update a key later?
102104

103105
```bash

bin/config.sh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,7 @@ else
451451
SLACK_BROKER_SIGNING_PUBLIC_KEY \
452452
SLACK_BROKER_POLL_INTERVAL_MS \
453453
SLACK_BROKER_MAX_MESSAGES \
454+
SLACK_BROKER_WAIT_SECONDS \
454455
SLACK_BROKER_DEDUPE_TTL_MS
455456

456457
prompt_secret "SLACK_BOT_TOKEN" \
@@ -613,6 +614,7 @@ ordered_keys=(
613614
SLACK_BROKER_SIGNING_PUBLIC_KEY
614615
SLACK_BROKER_POLL_INTERVAL_MS
615616
SLACK_BROKER_MAX_MESSAGES
617+
SLACK_BROKER_WAIT_SECONDS
616618
SLACK_BROKER_DEDUPE_TTL_MS
617619
BAUDBOT_AGENT_USER
618620
BAUDBOT_AGENT_HOME

slack-bridge/broker-bridge.mjs

Lines changed: 52 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,32 @@ import {
2424
} from "./security.mjs";
2525
import {
2626
canonicalizeEnvelope,
27-
canonicalizeOutbound,
27+
canonicalizeProtocolRequest,
2828
canonicalizeSendRequest,
2929
} from "./crypto.mjs";
3030

3131
const SOCKET_DIR = path.join(homedir(), ".pi", "session-control");
3232
const AGENT_TIMEOUT_MS = 120_000;
33-
const API_PORT = parseInt(process.env.BRIDGE_API_PORT || "7890", 10);
34-
const POLL_INTERVAL_MS = parseInt(process.env.SLACK_BROKER_POLL_INTERVAL_MS || "3000", 10);
35-
const MAX_MESSAGES = parseInt(process.env.SLACK_BROKER_MAX_MESSAGES || "10", 10);
36-
const DEDUPE_TTL_MS = parseInt(process.env.SLACK_BROKER_DEDUPE_TTL_MS || String(20 * 60 * 1000), 10);
33+
34+
function clampInt(value, min, max, fallback) {
35+
const parsed = Number.parseInt(String(value ?? ""), 10);
36+
if (!Number.isFinite(parsed)) return fallback;
37+
return Math.min(max, Math.max(min, parsed));
38+
}
39+
40+
const API_PORT = clampInt(process.env.BRIDGE_API_PORT || "7890", 0, 65535, 7890);
41+
const POLL_INTERVAL_MS = clampInt(process.env.SLACK_BROKER_POLL_INTERVAL_MS || "3000", 0, 60_000, 3000);
42+
const MAX_MESSAGES = clampInt(process.env.SLACK_BROKER_MAX_MESSAGES || "10", 1, 100, 10);
43+
const MAX_WAIT_SECONDS = 25;
44+
const BROKER_WAIT_SECONDS = clampInt(process.env.SLACK_BROKER_WAIT_SECONDS || "20", 0, MAX_WAIT_SECONDS, 20);
45+
const DEDUPE_TTL_MS = clampInt(
46+
process.env.SLACK_BROKER_DEDUPE_TTL_MS || String(20 * 60 * 1000),
47+
1_000,
48+
7 * 24 * 60 * 60 * 1000,
49+
20 * 60 * 1000,
50+
);
3751
const MAX_BACKOFF_MS = 30_000;
52+
const INBOX_PROTOCOL_VERSION = "2026-02-1";
3853
const BROKER_HEALTH_PATH = path.join(homedir(), ".pi", "agent", "broker-health.json");
3954

4055
function ts() {
@@ -351,12 +366,25 @@ function getThreadId(channel, threadTs) {
351366
return id;
352367
}
353368

354-
function signRequest(action, timestamp, payloadField) {
355-
const canonical = canonicalizeOutbound(workspaceId, action, timestamp, payloadField);
369+
function signProtocolRequest(action, timestamp, payload) {
370+
const canonical = canonicalizeProtocolRequest(
371+
workspaceId,
372+
INBOX_PROTOCOL_VERSION,
373+
action,
374+
timestamp,
375+
payload,
376+
);
356377
const sig = sodium.crypto_sign_detached(canonical, cryptoState.serverSignSecretKey);
357378
return toBase64(sig);
358379
}
359380

381+
function signPullRequest(timestamp, maxMessages, waitSeconds) {
382+
return signProtocolRequest("inbox.pull", timestamp, {
383+
max_messages: maxMessages,
384+
wait_seconds: waitSeconds,
385+
});
386+
}
387+
360388
async function brokerFetch(pathname, body) {
361389
const url = `${brokerBaseUrl}${pathname}`;
362390
const response = await fetch(url, {
@@ -389,26 +417,30 @@ async function brokerFetch(pathname, body) {
389417

390418
async function pullInbox() {
391419
const timestamp = Math.floor(Date.now() / 1000);
392-
const signature = signRequest("inbox.pull", timestamp, String(MAX_MESSAGES));
420+
const signature = signPullRequest(timestamp, MAX_MESSAGES, BROKER_WAIT_SECONDS);
393421

394-
const payload = await brokerFetch("/api/inbox/pull", {
422+
const body = {
395423
workspace_id: workspaceId,
424+
protocol_version: INBOX_PROTOCOL_VERSION,
396425
max_messages: MAX_MESSAGES,
426+
wait_seconds: BROKER_WAIT_SECONDS,
397427
timestamp,
398428
signature,
399-
});
429+
};
430+
431+
const payload = await brokerFetch("/api/inbox/pull", body);
400432

401433
return Array.isArray(payload.messages) ? payload.messages : [];
402434
}
403435

404436
async function ackInbox(messageIds) {
405437
if (messageIds.length === 0) return;
406438
const timestamp = Math.floor(Date.now() / 1000);
407-
const joined = messageIds.join(",");
408-
const signature = signRequest("inbox.ack", timestamp, joined);
439+
const signature = signProtocolRequest("inbox.ack", timestamp, { message_ids: messageIds });
409440

410441
await brokerFetch("/api/inbox/ack", {
411442
workspace_id: workspaceId,
443+
protocol_version: INBOX_PROTOCOL_VERSION,
412444
message_ids: messageIds,
413445
timestamp,
414446
signature,
@@ -918,7 +950,9 @@ async function startPollLoop() {
918950
}
919951

920952
backoffMs = POLL_INTERVAL_MS;
921-
await sleep(POLL_INTERVAL_MS);
953+
if (BROKER_WAIT_SECONDS <= 0) {
954+
await sleep(POLL_INTERVAL_MS);
955+
}
922956
} catch (err) {
923957
if (!pollSucceeded) {
924958
markHealth("poll", false, err);
@@ -960,7 +994,11 @@ async function startPollLoop() {
960994
logInfo(` outbound mode: ${outboundMode} ${outboundMode === "direct" ? "(using SLACK_BOT_TOKEN)" : "(via broker)"}`);
961995
logInfo(` broker: ${brokerBaseUrl}`);
962996
logInfo(` workspace: ${workspaceId}`);
963-
logInfo(` poll interval: ${POLL_INTERVAL_MS}ms, max messages: ${MAX_MESSAGES}`);
997+
logInfo(` inbox protocol: ${INBOX_PROTOCOL_VERSION}`);
998+
logInfo(
999+
` poll mode: ${BROKER_WAIT_SECONDS > 0 ? `long-poll (${BROKER_WAIT_SECONDS}s)` : "short-poll"}, ` +
1000+
`interval: ${POLL_INTERVAL_MS}ms, max messages: ${MAX_MESSAGES}`,
1001+
);
9641002
logInfo(` allowed users: ${ALLOWED_USERS.length || "all"}`);
9651003
logInfo(` pi socket: ${socketPath || "(not found — will retry on message)"}`);
9661004
await startPollLoop();

slack-bridge/crypto.mjs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,24 @@ export function canonicalizeOutbound(workspace, action, timestamp, encryptedBody
4646
return utf8Bytes(`${workspace}|${action}|${timestamp}|${encryptedBody}`);
4747
}
4848

49+
/**
50+
* Construct canonical bytes for protocol-versioned inbox pull/ack signing.
51+
*
52+
* Uses deterministic JSON serialization (sorted keys) to match broker
53+
* json-stable-stringify canonicalization.
54+
*/
55+
export function canonicalizeProtocolRequest(workspace, protocolVersion, action, timestamp, payload) {
56+
return utf8Bytes(
57+
stableStringify({
58+
workspace_id: workspace,
59+
protocol_version: protocolVersion,
60+
action,
61+
timestamp,
62+
payload,
63+
}),
64+
);
65+
}
66+
4967
/**
5068
* Construct canonical bytes for /api/send request signing.
5169
*

slack-bridge/crypto.test.mjs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import {
1010
stableStringify,
1111
canonicalizeEnvelope,
1212
canonicalizeOutbound,
13+
canonicalizeProtocolRequest,
1314
canonicalizeSendRequest,
1415
} from "./crypto.mjs";
1516

@@ -151,6 +152,30 @@ describe("canonicalizeOutbound", () => {
151152
});
152153
});
153154

155+
// ── canonicalizeProtocolRequest ─────────────────────────────────────────────
156+
157+
describe("canonicalizeProtocolRequest", () => {
158+
it("produces stable JSON payload for protocol-versioned inbox.pull", () => {
159+
const result = decode(
160+
canonicalizeProtocolRequest("T123", "2026-02-1", "inbox.pull", 1700000000, {
161+
max_messages: 10,
162+
wait_seconds: 20,
163+
}),
164+
);
165+
assert.equal(
166+
result,
167+
'{"action":"inbox.pull","payload":{"max_messages":10,"wait_seconds":20},"protocol_version":"2026-02-1","timestamp":1700000000,"workspace_id":"T123"}',
168+
);
169+
});
170+
171+
it("returns Uint8Array", () => {
172+
const result = canonicalizeProtocolRequest("T123", "2026-02-1", "inbox.ack", 1700000000, {
173+
message_ids: ["m1", "m2"],
174+
});
175+
assert.ok(result instanceof Uint8Array);
176+
});
177+
});
178+
154179
// ── canonicalizeSendRequest ─────────────────────────────────────────────────
155180

156181
describe("canonicalizeSendRequest", () => {

0 commit comments

Comments
 (0)