Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .env.schema
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,18 @@ SLACK_BROKER_PUBLIC_KEY=
# @sensitive=false @type=string
SLACK_BROKER_SIGNING_PUBLIC_KEY=

# Optional broker-issued bearer token for broker API auth
# @type=string
SLACK_BROKER_ACCESS_TOKEN=

# Optional broker token expiration timestamp (ISO-8601)
# @sensitive=false @type=string
SLACK_BROKER_ACCESS_TOKEN_EXPIRES_AT=

# Optional broker token scopes (comma-separated)
# @sensitive=false @type=string
SLACK_BROKER_ACCESS_TOKEN_SCOPES=

# Broker pull cadence in milliseconds (default: 3000)
# @sensitive=false @type=number
SLACK_BROKER_POLL_INTERVAL_MS=3000
Expand Down
7 changes: 7 additions & 0 deletions CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ Set by `sudo baudbot broker register` when using brokered Slack OAuth flow.
| `SLACK_BROKER_SERVER_SIGNING_PUBLIC_KEY` | Server Ed25519 public signing key (base64) |
| `SLACK_BROKER_PUBLIC_KEY` | Broker X25519 public key (base64) |
| `SLACK_BROKER_SIGNING_PUBLIC_KEY` | Broker Ed25519 public signing key (base64) |
| `SLACK_BROKER_ACCESS_TOKEN` | Optional broker-issued bearer token for broker API auth (used when broker enforces agent tokens) |
| `SLACK_BROKER_ACCESS_TOKEN_EXPIRES_AT` | Optional ISO timestamp for broker token expiry |
| `SLACK_BROKER_ACCESS_TOKEN_SCOPES` | Optional comma-separated broker token scopes |
| `SLACK_BROKER_POLL_INTERVAL_MS` | Inbox poll interval in milliseconds (default: `3000`) |
| `SLACK_BROKER_MAX_MESSAGES` | Max leased messages per poll request (default: `10`) |
| `SLACK_BROKER_WAIT_SECONDS` | Long-poll wait window for `/api/inbox/pull` (default: `20`, set `0` for immediate short-poll, max `25`) |
Expand Down Expand Up @@ -189,6 +192,10 @@ SENTRY_CHANNEL_ID=C0987654321
# Slack broker registration (optional, set by: sudo baudbot broker register)
SLACK_BROKER_URL=https://broker.example.com
SLACK_BROKER_WORKSPACE_ID=T0123ABCD
# Optional broker auth token fields (set by broker register when provided)
# SLACK_BROKER_ACCESS_TOKEN=...
# SLACK_BROKER_ACCESS_TOKEN_EXPIRES_AT=2026-02-22T22:15:00.000Z
# SLACK_BROKER_ACCESS_TOKEN_SCOPES=slack.send,inbox.pull,inbox.ack
SLACK_BROKER_POLL_INTERVAL_MS=3000
SLACK_BROKER_MAX_MESSAGES=10
SLACK_BROKER_WAIT_SECONDS=20
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ sudo baudbot broker register \
```

Broker pull mode uses long-polling by default (`SLACK_BROKER_WAIT_SECONDS=20`, max `25`; set `0` for immediate short-poll behavior).
When broker agent-token auth is enabled server-side, `baudbot broker register` stores broker token fields in env and broker-mode outbound requests include `Authorization: Bearer ...` automatically.

Need to rotate/update a key later?

Expand Down
15 changes: 15 additions & 0 deletions bin/broker-register.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,11 @@ export async function registerWithBroker({
return {
broker_pubkey: registerBrokerPubkey || fetchedBrokerKeys.broker_pubkey,
broker_signing_pubkey: registerBrokerSigningPubkey || fetchedBrokerKeys.broker_signing_pubkey,
broker_access_token: body?.broker_access_token,
broker_access_token_expires_at: body?.broker_access_token_expires_at,
broker_access_token_scopes: Array.isArray(body?.broker_access_token_scopes)
? body.broker_access_token_scopes.filter((scope) => typeof scope === "string")
: undefined,
decrypted_bot_token: decryptedBotToken,
request_payload: payload,
};
Expand Down Expand Up @@ -573,6 +578,16 @@ export async function runRegistration({
SLACK_BROKER_SIGNING_PUBLIC_KEY: registration.broker_signing_pubkey,
};

if (registration.broker_access_token) {
updates.SLACK_BROKER_ACCESS_TOKEN = registration.broker_access_token;
}
if (registration.broker_access_token_expires_at) {
updates.SLACK_BROKER_ACCESS_TOKEN_EXPIRES_AT = registration.broker_access_token_expires_at;
}
if (registration.broker_access_token_scopes?.length) {
updates.SLACK_BROKER_ACCESS_TOKEN_SCOPES = registration.broker_access_token_scopes.join(",");
}

// Add the decrypted bot token if available
if (registration.decrypted_bot_token) {
updates.SLACK_BOT_TOKEN = registration.decrypted_bot_token;
Expand Down
18 changes: 17 additions & 1 deletion bin/broker-register.test.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ test("registerWithBroker fetches pubkeys then posts registration payload", async
ok: true,
broker_pubkey: Buffer.alloc(32, 9).toString("base64"),
broker_signing_pubkey: Buffer.alloc(32, 8).toString("base64"),
broker_access_token: "tok-abc",
broker_access_token_expires_at: "2026-02-22T22:00:00.000Z",
broker_access_token_scopes: ["slack.send", "inbox.pull"],
});
}

Expand All @@ -148,6 +151,9 @@ test("registerWithBroker fetches pubkeys then posts registration payload", async
assert.match(calls[1].url, /\/api\/register$/);
assert.equal(result.broker_pubkey, Buffer.alloc(32, 9).toString("base64"));
assert.equal(result.broker_signing_pubkey, Buffer.alloc(32, 8).toString("base64"));
assert.equal(result.broker_access_token, "tok-abc");
assert.equal(result.broker_access_token_expires_at, "2026-02-22T22:00:00.000Z");
assert.deepEqual(result.broker_access_token_scopes, ["slack.send", "inbox.pull"]);
});

test("registerWithBroker sends registration_token when provided", async () => {
Expand Down Expand Up @@ -201,7 +207,14 @@ test("runRegistration integration path succeeds against live local HTTP server",
for await (const chunk of req) raw += chunk;
receivedRegisterPayload = JSON.parse(raw);
res.writeHead(200, { "Content-Type": "application/json" });
res.end(JSON.stringify({ ok: true, broker_pubkey: brokerPubkey, broker_signing_pubkey: brokerSigningPubkey }));
res.end(JSON.stringify({
ok: true,
broker_pubkey: brokerPubkey,
broker_signing_pubkey: brokerSigningPubkey,
broker_access_token: "tok-live",
broker_access_token_expires_at: "2026-02-22T22:00:00.000Z",
broker_access_token_scopes: ["slack.send"],
}));
return;
}

Expand All @@ -228,6 +241,9 @@ test("runRegistration integration path succeeds against live local HTTP server",
assert.ok(result.updates.SLACK_BROKER_SERVER_SIGNING_PRIVATE_KEY);
assert.equal(result.updates.SLACK_BROKER_PUBLIC_KEY, brokerPubkey);
assert.equal(result.updates.SLACK_BROKER_SIGNING_PUBLIC_KEY, brokerSigningPubkey);
assert.equal(result.updates.SLACK_BROKER_ACCESS_TOKEN, "tok-live");
assert.equal(result.updates.SLACK_BROKER_ACCESS_TOKEN_EXPIRES_AT, "2026-02-22T22:00:00.000Z");
assert.equal(result.updates.SLACK_BROKER_ACCESS_TOKEN_SCOPES, "slack.send");
} finally {
await new Promise((resolve, reject) => server.close((err) => (err ? reject(err) : resolve())));
}
Expand Down
6 changes: 6 additions & 0 deletions bin/config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,9 @@ else
SLACK_BROKER_SERVER_SIGNING_PUBLIC_KEY \
SLACK_BROKER_PUBLIC_KEY \
SLACK_BROKER_SIGNING_PUBLIC_KEY \
SLACK_BROKER_ACCESS_TOKEN \
SLACK_BROKER_ACCESS_TOKEN_EXPIRES_AT \
SLACK_BROKER_ACCESS_TOKEN_SCOPES \
SLACK_BROKER_POLL_INTERVAL_MS \
SLACK_BROKER_MAX_MESSAGES \
SLACK_BROKER_WAIT_SECONDS \
Expand Down Expand Up @@ -612,6 +615,9 @@ ordered_keys=(
SLACK_BROKER_SERVER_SIGNING_PUBLIC_KEY
SLACK_BROKER_PUBLIC_KEY
SLACK_BROKER_SIGNING_PUBLIC_KEY
SLACK_BROKER_ACCESS_TOKEN
SLACK_BROKER_ACCESS_TOKEN_EXPIRES_AT
SLACK_BROKER_ACCESS_TOKEN_SCOPES
SLACK_BROKER_POLL_INTERVAL_MS
SLACK_BROKER_MAX_MESSAGES
SLACK_BROKER_WAIT_SECONDS
Expand Down
34 changes: 33 additions & 1 deletion slack-bridge/broker-bridge.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ const directSlackRateLimiter = createRateLimiter({ maxRequests: 1, windowMs: 1_0

const workspaceId = process.env.SLACK_BROKER_WORKSPACE_ID;
const brokerBaseUrl = String(process.env.SLACK_BROKER_URL || "").replace(/\/$/, "");
const brokerAccessToken = String(process.env.SLACK_BROKER_ACCESS_TOKEN || "").trim();
Comment thread
sentry[bot] marked this conversation as resolved.
const brokerAccessTokenExpiresAt = String(process.env.SLACK_BROKER_ACCESS_TOKEN_EXPIRES_AT || "").trim();

// Check if direct Slack API mode is available
const hasDirectSlackToken = Boolean(process.env.SLACK_BOT_TOKEN);
Expand All @@ -109,6 +111,7 @@ let socketPath = null;
let cryptoState = null;

const dedupe = new Map();
let brokerTokenExpiryFormatWarned = false;

const brokerHealth = {
started_at: new Date().toISOString(),
Expand Down Expand Up @@ -385,11 +388,37 @@ function signPullRequest(timestamp, maxMessages, waitSeconds) {
});
}

function isBrokerAccessTokenExpired() {
if (!brokerAccessToken || !brokerAccessTokenExpiresAt) return false;
const ts = Date.parse(brokerAccessTokenExpiresAt);
if (!Number.isFinite(ts)) {
if (!brokerTokenExpiryFormatWarned) {
logWarn("⚠️ invalid SLACK_BROKER_ACCESS_TOKEN_EXPIRES_AT format; expected ISO-8601 timestamp");
brokerTokenExpiryFormatWarned = true;
}
return false;
}
return Date.now() >= ts;
}

function enforceBrokerTokenFreshnessOrExit() {
if (!isBrokerAccessTokenExpired()) return;

logError("❌ broker access token is expired; broker API auth will fail.");
logError(" run: sudo baudbot broker register && sudo baudbot restart");
process.exit(1);
}

async function brokerFetch(pathname, body) {
enforceBrokerTokenFreshnessOrExit();
const url = `${brokerBaseUrl}${pathname}`;
const headers = { "Content-Type": "application/json" };
if (brokerAccessToken) {
headers.Authorization = `Bearer ${brokerAccessToken}`;
}
const response = await fetch(url, {
method: "POST",
headers: { "Content-Type": "application/json" },
headers,
body: JSON.stringify(body),
});

Expand Down Expand Up @@ -987,6 +1016,8 @@ async function startPollLoop() {
serverSignSecretKey: signKeypair.privateKey,
};

enforceBrokerTokenFreshnessOrExit();

refreshSocket();
startApiServer();
persistBrokerHealth();
Expand All @@ -995,6 +1026,7 @@ async function startPollLoop() {
logInfo(` broker: ${brokerBaseUrl}`);
logInfo(` workspace: ${workspaceId}`);
logInfo(` inbox protocol: ${INBOX_PROTOCOL_VERSION}`);
logInfo(` broker auth token: ${brokerAccessToken ? "configured" : "not configured"}`);
logInfo(
` poll mode: ${BROKER_WAIT_SECONDS > 0 ? `long-poll (${BROKER_WAIT_SECONDS}s)` : "short-poll"}, ` +
`interval: ${POLL_INTERVAL_MS}ms, max messages: ${MAX_MESSAGES}`,
Expand Down
153 changes: 153 additions & 0 deletions test/broker-bridge.integration.test.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,24 @@ function waitFor(condition, timeoutMs = 10_000, intervalMs = 50, onTimeoutMessag
});
}

async function reserveFreePort() {
const server = createServer((_req, res) => {
res.writeHead(204);
res.end();
});

await new Promise((resolve) => server.listen(0, "127.0.0.1", resolve));
const address = server.address();
if (!address || typeof address === "string") {
await new Promise((resolve) => server.close(() => resolve(undefined)));
throw new Error("failed to reserve free port");
}

const port = address.port;
await new Promise((resolve) => server.close(() => resolve(undefined)));
return port;
}

describe("broker pull bridge semi-integration", () => {
const children = [];
const servers = [];
Expand Down Expand Up @@ -621,4 +639,139 @@ describe("broker pull bridge semi-integration", () => {

bridge.kill("SIGTERM");
});

it("sends broker bearer token when configured", async () => {
await sodium.ready;

const workspaceId = "T123BROKER";
const bridgeApiPort = await reserveFreePort();
let outboundAuthorization = null;

const broker = createServer(async (req, res) => {
if (req.method === "POST" && req.url === "/api/inbox/pull") {
res.writeHead(200, { "Content-Type": "application/json" });
res.end(JSON.stringify({ ok: true, messages: [] }));
return;
}

if (req.method === "POST" && req.url === "/api/send") {
outboundAuthorization = req.headers.authorization || null;
res.writeHead(200, { "Content-Type": "application/json" });
res.end(JSON.stringify({ ok: true, ts: "1234.5678" }));
return;
}

if (req.method === "POST" && req.url === "/api/inbox/ack") {
res.writeHead(200, { "Content-Type": "application/json" });
res.end(JSON.stringify({ ok: true, acked: 0 }));
return;
}

res.writeHead(404, { "Content-Type": "application/json" });
res.end(JSON.stringify({ ok: false, error: "not found" }));
});

await new Promise((resolve) => broker.listen(0, "127.0.0.1", resolve));
servers.push(broker);

const address = broker.address();
if (!address || typeof address === "string") {
throw new Error("failed to get broker test server address");
}
const brokerUrl = `http://127.0.0.1:${address.port}`;

const testFileDir = path.dirname(fileURLToPath(import.meta.url));
const repoRoot = path.dirname(testFileDir);
const bridgePath = path.join(repoRoot, "slack-bridge", "broker-bridge.mjs");
const bridgeCwd = path.join(repoRoot, "slack-bridge");

const bridge = spawn("node", [bridgePath], {
cwd: bridgeCwd,
env: {
...process.env,
SLACK_BROKER_URL: brokerUrl,
SLACK_BROKER_WORKSPACE_ID: workspaceId,
SLACK_BROKER_SERVER_PRIVATE_KEY: b64(32, 11),
SLACK_BROKER_SERVER_PUBLIC_KEY: b64(32, 12),
SLACK_BROKER_SERVER_SIGNING_PRIVATE_KEY: Buffer.alloc(32, 24).toString("base64"),
SLACK_BROKER_PUBLIC_KEY: b64(32, 14),
SLACK_BROKER_SIGNING_PUBLIC_KEY: b64(32, 15),
SLACK_BROKER_ACCESS_TOKEN: "test-broker-token",
SLACK_ALLOWED_USERS: "U_ALLOWED",
SLACK_BROKER_POLL_INTERVAL_MS: "50",
SLACK_BROKER_WAIT_SECONDS: "0",
BRIDGE_API_PORT: String(bridgeApiPort),
},
stdio: ["ignore", "pipe", "pipe"],
});
children.push(bridge);

const start = Date.now();
// Bridge local API may not be ready immediately after spawn; retry until it accepts /send.
while (Date.now() - start < 10_000) {
try {
const res = await fetch(`http://127.0.0.1:${bridgeApiPort}/send`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ channel: "C123", text: "hello" }),
});
if (res.ok) break;
} catch {
// retry while bridge boots
}
await new Promise((resolve) => setTimeout(resolve, 100));
}

await waitFor(() => outboundAuthorization !== null, 10_000, 50, "timeout waiting for broker /api/send call");
expect(outboundAuthorization).toBe("Bearer test-broker-token");

bridge.kill("SIGTERM");
});

it("exits when broker access token is expired", async () => {
await sodium.ready;

const testFileDir = path.dirname(fileURLToPath(import.meta.url));
const repoRoot = path.dirname(testFileDir);
const bridgePath = path.join(repoRoot, "slack-bridge", "broker-bridge.mjs");
const bridgeCwd = path.join(repoRoot, "slack-bridge");

let bridgeStdout = "";
let bridgeStderr = "";

const bridge = spawn("node", [bridgePath], {
cwd: bridgeCwd,
env: {
...process.env,
SLACK_BROKER_URL: "http://127.0.0.1:65535",
SLACK_BROKER_WORKSPACE_ID: "T123BROKER",
SLACK_BROKER_SERVER_PRIVATE_KEY: b64(32, 11),
SLACK_BROKER_SERVER_PUBLIC_KEY: b64(32, 12),
SLACK_BROKER_SERVER_SIGNING_PRIVATE_KEY: Buffer.alloc(32, 25).toString("base64"),
SLACK_BROKER_PUBLIC_KEY: b64(32, 14),
SLACK_BROKER_SIGNING_PUBLIC_KEY: b64(32, 15),
SLACK_BROKER_ACCESS_TOKEN: "expired-token",
SLACK_BROKER_ACCESS_TOKEN_EXPIRES_AT: "2000-01-01T00:00:00.000Z",
SLACK_ALLOWED_USERS: "U_ALLOWED",
BRIDGE_API_PORT: "0",
},
stdio: ["ignore", "pipe", "pipe"],
});

bridge.stdout.on("data", (chunk) => {
bridgeStdout += chunk.toString();
});
bridge.stderr.on("data", (chunk) => {
bridgeStderr += chunk.toString();
});

children.push(bridge);

const exited = await new Promise((resolve) => {
bridge.on("exit", (code, signal) => resolve({ code, signal }));
});

expect(exited.code).toBe(1);
expect(`${bridgeStdout}\n${bridgeStderr}`).toContain("broker access token is expired");
});
});