Skip to content

Commit 5cf2201

Browse files
committed
bridge: add broker long-poll v2 pull signing
1 parent 55acd30 commit 5cf2201

7 files changed

Lines changed: 353 additions & 10 deletions

File tree

CONFIGURATION.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ Set by `sudo baudbot broker register` when using brokered Slack OAuth flow.
106106
| `SLACK_BROKER_SIGNING_PUBLIC_KEY` | Broker Ed25519 public signing key (base64) |
107107
| `SLACK_BROKER_POLL_INTERVAL_MS` | Inbox poll interval in milliseconds (default: `3000`) |
108108
| `SLACK_BROKER_MAX_MESSAGES` | Max leased messages per poll request (default: `10`) |
109+
| `SLACK_BROKER_WAIT_SECONDS` | Long-poll wait window for `/api/inbox/pull` (default: `20`, set `0` for legacy short-poll, max `25`) |
109110
| `SLACK_BROKER_DEDUPE_TTL_MS` | Dedupe cache TTL in milliseconds (default: `1200000`) |
110111

111112
### Kernel (Cloud Browsers)
@@ -190,6 +191,7 @@ SLACK_BROKER_URL=https://broker.example.com
190191
SLACK_BROKER_WORKSPACE_ID=T0123ABCD
191192
SLACK_BROKER_POLL_INTERVAL_MS=3000
192193
SLACK_BROKER_MAX_MESSAGES=10
194+
SLACK_BROKER_WAIT_SECONDS=20
193195
SLACK_BROKER_DEDUPE_TTL_MS=1200000
194196

195197
# Experimental features (required for email)

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,8 @@ sudo baudbot broker register \
9898
--registration-token <token-from-dashboard-callback>
9999
```
100100

101+
Broker pull mode uses long-polling by default (`SLACK_BROKER_WAIT_SECONDS=20`, max `25`; set `0` for legacy short-poll).
102+
101103
Need to rotate/update a key later?
102104

103105
```bash

bin/config.sh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,7 @@ else
451451
SLACK_BROKER_SIGNING_PUBLIC_KEY \
452452
SLACK_BROKER_POLL_INTERVAL_MS \
453453
SLACK_BROKER_MAX_MESSAGES \
454+
SLACK_BROKER_WAIT_SECONDS \
454455
SLACK_BROKER_DEDUPE_TTL_MS
455456

456457
prompt_secret "SLACK_BOT_TOKEN" \
@@ -613,6 +614,7 @@ ordered_keys=(
613614
SLACK_BROKER_SIGNING_PUBLIC_KEY
614615
SLACK_BROKER_POLL_INTERVAL_MS
615616
SLACK_BROKER_MAX_MESSAGES
617+
SLACK_BROKER_WAIT_SECONDS
616618
SLACK_BROKER_DEDUPE_TTL_MS
617619
BAUDBOT_AGENT_USER
618620
BAUDBOT_AGENT_HOME

slack-bridge/broker-bridge.mjs

Lines changed: 45 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,30 @@ import {
2525
import {
2626
canonicalizeEnvelope,
2727
canonicalizeOutbound,
28+
canonicalizeOutboundV2,
2829
canonicalizeSendRequest,
2930
} from "./crypto.mjs";
3031

3132
const SOCKET_DIR = path.join(homedir(), ".pi", "session-control");
3233
const AGENT_TIMEOUT_MS = 120_000;
33-
const API_PORT = parseInt(process.env.BRIDGE_API_PORT || "7890", 10);
34-
const POLL_INTERVAL_MS = parseInt(process.env.SLACK_BROKER_POLL_INTERVAL_MS || "3000", 10);
35-
const MAX_MESSAGES = parseInt(process.env.SLACK_BROKER_MAX_MESSAGES || "10", 10);
36-
const DEDUPE_TTL_MS = parseInt(process.env.SLACK_BROKER_DEDUPE_TTL_MS || String(20 * 60 * 1000), 10);
34+
35+
function clampInt(value, min, max, fallback) {
36+
const parsed = Number.parseInt(String(value ?? ""), 10);
37+
if (!Number.isFinite(parsed)) return fallback;
38+
return Math.min(max, Math.max(min, parsed));
39+
}
40+
41+
const API_PORT = clampInt(process.env.BRIDGE_API_PORT || "7890", 1, 65535, 7890);
42+
const POLL_INTERVAL_MS = clampInt(process.env.SLACK_BROKER_POLL_INTERVAL_MS || "3000", 0, 60_000, 3000);
43+
const MAX_MESSAGES = clampInt(process.env.SLACK_BROKER_MAX_MESSAGES || "10", 1, 100, 10);
44+
const MAX_WAIT_SECONDS = 25;
45+
const BROKER_WAIT_SECONDS = clampInt(process.env.SLACK_BROKER_WAIT_SECONDS || "20", 0, MAX_WAIT_SECONDS, 20);
46+
const DEDUPE_TTL_MS = clampInt(
47+
process.env.SLACK_BROKER_DEDUPE_TTL_MS || String(20 * 60 * 1000),
48+
1_000,
49+
7 * 24 * 60 * 60 * 1000,
50+
20 * 60 * 1000,
51+
);
3752
const MAX_BACKOFF_MS = 30_000;
3853
const BROKER_HEALTH_PATH = path.join(homedir(), ".pi", "agent", "broker-health.json");
3954

@@ -357,6 +372,19 @@ function signRequest(action, timestamp, payloadField) {
357372
return toBase64(sig);
358373
}
359374

375+
function signPullRequest(timestamp, maxMessages, waitSeconds) {
376+
if (waitSeconds <= 0) {
377+
return signRequest("inbox.pull", timestamp, String(maxMessages));
378+
}
379+
380+
const canonical = canonicalizeOutboundV2(workspaceId, "inbox.pull.v2", timestamp, {
381+
max_messages: maxMessages,
382+
wait_seconds: waitSeconds,
383+
});
384+
const sig = sodium.crypto_sign_detached(canonical, cryptoState.serverSignSecretKey);
385+
return toBase64(sig);
386+
}
387+
360388
async function brokerFetch(pathname, body) {
361389
const url = `${brokerBaseUrl}${pathname}`;
362390
const response = await fetch(url, {
@@ -389,14 +417,17 @@ async function brokerFetch(pathname, body) {
389417

390418
async function pullInbox() {
391419
const timestamp = Math.floor(Date.now() / 1000);
392-
const signature = signRequest("inbox.pull", timestamp, String(MAX_MESSAGES));
420+
const signature = signPullRequest(timestamp, MAX_MESSAGES, BROKER_WAIT_SECONDS);
393421

394-
const payload = await brokerFetch("/api/inbox/pull", {
422+
const body = {
395423
workspace_id: workspaceId,
396424
max_messages: MAX_MESSAGES,
425+
...(BROKER_WAIT_SECONDS > 0 ? { wait_seconds: BROKER_WAIT_SECONDS } : {}),
397426
timestamp,
398427
signature,
399-
});
428+
};
429+
430+
const payload = await brokerFetch("/api/inbox/pull", body);
400431

401432
return Array.isArray(payload.messages) ? payload.messages : [];
402433
}
@@ -918,7 +949,9 @@ async function startPollLoop() {
918949
}
919950

920951
backoffMs = POLL_INTERVAL_MS;
921-
await sleep(POLL_INTERVAL_MS);
952+
if (BROKER_WAIT_SECONDS <= 0) {
953+
await sleep(POLL_INTERVAL_MS);
954+
}
922955
} catch (err) {
923956
if (!pollSucceeded) {
924957
markHealth("poll", false, err);
@@ -960,7 +993,10 @@ async function startPollLoop() {
960993
logInfo(` outbound mode: ${outboundMode} ${outboundMode === "direct" ? "(using SLACK_BOT_TOKEN)" : "(via broker)"}`);
961994
logInfo(` broker: ${brokerBaseUrl}`);
962995
logInfo(` workspace: ${workspaceId}`);
963-
logInfo(` poll interval: ${POLL_INTERVAL_MS}ms, max messages: ${MAX_MESSAGES}`);
996+
logInfo(
997+
` poll mode: ${BROKER_WAIT_SECONDS > 0 ? `long-poll (${BROKER_WAIT_SECONDS}s)` : "short-poll"}, ` +
998+
`interval: ${POLL_INTERVAL_MS}ms, max messages: ${MAX_MESSAGES}`,
999+
);
9641000
logInfo(` allowed users: ${ALLOWED_USERS.length || "all"}`);
9651001
logInfo(` pi socket: ${socketPath || "(not found — will retry on message)"}`);
9661002
await startPollLoop();

slack-bridge/crypto.mjs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,23 @@ export function canonicalizeOutbound(workspace, action, timestamp, encryptedBody
4646
return utf8Bytes(`${workspace}|${action}|${timestamp}|${encryptedBody}`);
4747
}
4848

49+
/**
50+
* Construct canonical bytes for v2 outbound request signing.
51+
*
52+
* Uses deterministic JSON serialization (sorted keys) to match broker
53+
* json-stable-stringify canonicalization.
54+
*/
55+
export function canonicalizeOutboundV2(workspace, action, timestamp, payload) {
56+
return utf8Bytes(
57+
stableStringify({
58+
workspace_id: workspace,
59+
action,
60+
timestamp,
61+
payload,
62+
}),
63+
);
64+
}
65+
4966
/**
5067
* Construct canonical bytes for /api/send request signing.
5168
*

slack-bridge/crypto.test.mjs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import {
1010
stableStringify,
1111
canonicalizeEnvelope,
1212
canonicalizeOutbound,
13+
canonicalizeOutboundV2,
1314
canonicalizeSendRequest,
1415
} from "./crypto.mjs";
1516

@@ -151,6 +152,31 @@ describe("canonicalizeOutbound", () => {
151152
});
152153
});
153154

155+
// ── canonicalizeOutboundV2 ──────────────────────────────────────────────────
156+
157+
describe("canonicalizeOutboundV2", () => {
158+
it("produces stable JSON payload for inbox.pull.v2", () => {
159+
const result = decode(
160+
canonicalizeOutboundV2("T123", "inbox.pull.v2", 1700000000, {
161+
max_messages: 10,
162+
wait_seconds: 20,
163+
}),
164+
);
165+
assert.equal(
166+
result,
167+
'{"action":"inbox.pull.v2","payload":{"max_messages":10,"wait_seconds":20},"timestamp":1700000000,"workspace_id":"T123"}',
168+
);
169+
});
170+
171+
it("returns Uint8Array", () => {
172+
const result = canonicalizeOutboundV2("T123", "inbox.pull.v2", 1700000000, {
173+
max_messages: 10,
174+
wait_seconds: 20,
175+
});
176+
assert.ok(result instanceof Uint8Array);
177+
});
178+
});
179+
154180
// ── canonicalizeSendRequest ─────────────────────────────────────────────────
155181

156182
describe("canonicalizeSendRequest", () => {

0 commit comments

Comments
 (0)