Skip to content

Commit ddc1d2b

Browse files
committed
bridge: add in-memory /logs endpoint for broker bridge
1 parent 1b23c11 commit ddc1d2b

2 files changed

Lines changed: 205 additions & 14 deletions

File tree

slack-bridge/broker-bridge.mjs

Lines changed: 103 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -51,21 +51,64 @@ const DEDUPE_TTL_MS = clampInt(
5151
const MAX_BACKOFF_MS = 30_000;
5252
const INBOX_PROTOCOL_VERSION = "2026-02-1";
5353
const BROKER_HEALTH_PATH = path.join(homedir(), ".pi", "agent", "broker-health.json");
54+
const LOG_BUFFER_MAX_LINES = 1000;
55+
56+
const logLineBuffer = [];
5457

5558
function ts() {
5659
return new Date().toISOString();
5760
}
5861

62+
function formatLogArg(arg) {
63+
if (typeof arg === "string") return arg;
64+
if (arg instanceof Error) return arg.stack || arg.message;
65+
try {
66+
return JSON.stringify(arg);
67+
} catch {
68+
return String(arg);
69+
}
70+
}
71+
72+
function pushLogLine(line) {
73+
const lines = String(line).split(/\r?\n/);
74+
for (const rawLine of lines) {
75+
const normalizedLine = rawLine.trimEnd();
76+
if (!normalizedLine) continue;
77+
logLineBuffer.push(normalizedLine);
78+
}
79+
80+
const overflow = logLineBuffer.length - LOG_BUFFER_MAX_LINES;
81+
if (overflow > 0) {
82+
logLineBuffer.splice(0, overflow);
83+
}
84+
}
85+
86+
function logWithLevel(level, ...args) {
87+
const timestampPrefix = `[${ts()}]`;
88+
const line = [timestampPrefix, ...args.map(formatLogArg)].join(" ");
89+
pushLogLine(line);
90+
91+
if (level === "error") {
92+
console.error(timestampPrefix, ...args);
93+
return;
94+
}
95+
if (level === "warn") {
96+
console.warn(timestampPrefix, ...args);
97+
return;
98+
}
99+
console.log(timestampPrefix, ...args);
100+
}
101+
59102
function logInfo(...args) {
60-
console.log(`[${ts()}]`, ...args);
103+
logWithLevel("info", ...args);
61104
}
62105

63106
function logError(...args) {
64-
console.error(`[${ts()}]`, ...args);
107+
logWithLevel("error", ...args);
65108
}
66109

67110
function logWarn(...args) {
68-
console.warn(`[${ts()}]`, ...args);
111+
logWithLevel("warn", ...args);
69112
}
70113

71114
for (const key of [
@@ -669,13 +712,37 @@ async function processPulledMessage(message) {
669712
return true;
670713
}
671714

715+
function getLogLinesForResponse(url) {
716+
const nParam = url.searchParams.get("n");
717+
const filterParam = url.searchParams.get("filter");
718+
719+
let requestedLineCount = null;
720+
if (nParam !== null) {
721+
const parsedN = Number.parseInt(nParam, 10);
722+
if (!Number.isFinite(parsedN) || parsedN < 1) {
723+
throw new Error("n must be a positive integer");
724+
}
725+
requestedLineCount = Math.min(parsedN, LOG_BUFFER_MAX_LINES);
726+
}
727+
728+
let lines = logLineBuffer;
729+
730+
const normalizedFilter = filterParam?.trim().toLowerCase();
731+
if (normalizedFilter) {
732+
lines = lines.filter((line) => line.toLowerCase().includes(normalizedFilter));
733+
}
734+
735+
if (requestedLineCount !== null) {
736+
lines = lines.slice(-requestedLineCount);
737+
}
738+
739+
return lines;
740+
}
741+
672742
function startApiServer() {
673743
const server = createServer(async (req, res) => {
674-
if (req.method !== "POST") {
675-
res.writeHead(405, { "Content-Type": "application/json" });
676-
res.end(JSON.stringify({ error: "Method not allowed" }));
677-
return;
678-
}
744+
const url = new URL(req.url, `http://localhost:${API_PORT}`);
745+
const pathname = url.pathname;
679746

680747
const remoteAddr = req.socket.remoteAddress;
681748
if (remoteAddr !== "127.0.0.1" && remoteAddr !== "::1" && remoteAddr !== "::ffff:127.0.0.1") {
@@ -690,6 +757,31 @@ function startApiServer() {
690757
return;
691758
}
692759

760+
if (pathname === "/logs") {
761+
if (req.method !== "GET") {
762+
res.writeHead(405, { "Content-Type": "application/json" });
763+
res.end(JSON.stringify({ error: "Method not allowed" }));
764+
return;
765+
}
766+
767+
try {
768+
const lines = getLogLinesForResponse(url);
769+
res.writeHead(200, { "Content-Type": "text/plain; charset=utf-8" });
770+
res.end(lines.length > 0 ? `${lines.join("\n")}\n` : "");
771+
return;
772+
} catch (err) {
773+
res.writeHead(400, { "Content-Type": "application/json" });
774+
res.end(JSON.stringify({ error: err instanceof Error ? err.message : "invalid query params" }));
775+
return;
776+
}
777+
}
778+
779+
if (req.method !== "POST") {
780+
res.writeHead(405, { "Content-Type": "application/json" });
781+
res.end(JSON.stringify({ error: "Method not allowed" }));
782+
return;
783+
}
784+
693785
let rawApiRequestBody = "";
694786
for await (const chunk of req) rawApiRequestBody += chunk;
695787

@@ -703,9 +795,6 @@ function startApiServer() {
703795
}
704796

705797
try {
706-
const url = new URL(req.url, `http://localhost:${API_PORT}`);
707-
const pathname = url.pathname;
708-
709798
if (pathname === "/send") {
710799
const validationError = validateSendParams(apiRequestBody);
711800
if (validationError) {
@@ -715,7 +804,7 @@ function startApiServer() {
715804
}
716805

717806
const { channel, text, thread_ts } = apiRequestBody;
718-
807+
719808
const result = await sendViaBroker({
720809
action: "chat.postMessage",
721810
routing: { channel, ...(thread_ts ? { thread_ts } : {}) },
@@ -767,7 +856,7 @@ function startApiServer() {
767856
}
768857

769858
const { channel, timestamp, emoji } = apiRequestBody;
770-
859+
771860
await sendViaBroker({
772861
action: "reactions.add",
773862
routing: { channel, timestamp, emoji },
@@ -780,7 +869,7 @@ function startApiServer() {
780869
}
781870

782871
res.writeHead(404, { "Content-Type": "application/json" });
783-
res.end(JSON.stringify({ error: "Not found. Endpoints: POST /send, POST /reply, POST /react" }));
872+
res.end(JSON.stringify({ error: "Not found. Endpoints: POST /send, POST /reply, POST /react, GET /logs" }));
784873
} catch (err) {
785874
res.writeHead(500, { "Content-Type": "application/json" });
786875
res.end(JSON.stringify({ error: err instanceof Error ? err.message : "unknown error" }));

test/broker-bridge.integration.test.mjs

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,108 @@ describe("broker pull bridge semi-integration", () => {
7070
tempDirs.length = 0;
7171
});
7272

73+
it("serves in-memory recent logs via GET /logs", async () => {
74+
await sodium.ready;
75+
76+
const apiPort = await reserveFreePort();
77+
78+
const broker = createServer(async (req, res) => {
79+
if (req.method === "POST" && req.url === "/api/inbox/pull") {
80+
res.writeHead(200, { "Content-Type": "application/json" });
81+
res.end(JSON.stringify({ ok: true, messages: [] }));
82+
return;
83+
}
84+
85+
if (req.method === "POST" && req.url === "/api/inbox/ack") {
86+
res.writeHead(200, { "Content-Type": "application/json" });
87+
res.end(JSON.stringify({ ok: true, acked: 0 }));
88+
return;
89+
}
90+
91+
if (req.method === "POST" && req.url === "/api/send") {
92+
res.writeHead(200, { "Content-Type": "application/json" });
93+
res.end(JSON.stringify({ ok: true, ts: "1234.5678" }));
94+
return;
95+
}
96+
97+
res.writeHead(404, { "Content-Type": "application/json" });
98+
res.end(JSON.stringify({ ok: false, error: "not found" }));
99+
});
100+
101+
await new Promise((resolve) => broker.listen(0, "127.0.0.1", resolve));
102+
servers.push(broker);
103+
104+
const brokerAddress = broker.address();
105+
if (!brokerAddress || typeof brokerAddress === "string") {
106+
throw new Error("failed to get broker test server address");
107+
}
108+
109+
const testFileDir = path.dirname(fileURLToPath(import.meta.url));
110+
const repoRoot = path.dirname(testFileDir);
111+
const bridgePath = path.join(repoRoot, "slack-bridge", "broker-bridge.mjs");
112+
const bridgeCwd = path.join(repoRoot, "slack-bridge");
113+
114+
let bridgeStdout = "";
115+
let bridgeStderr = "";
116+
117+
const bridge = spawn("node", [bridgePath], {
118+
cwd: bridgeCwd,
119+
env: {
120+
...process.env,
121+
SLACK_BROKER_URL: `http://127.0.0.1:${brokerAddress.port}`,
122+
SLACK_BROKER_WORKSPACE_ID: "T123BROKER",
123+
SLACK_BROKER_SERVER_PRIVATE_KEY: b64(32, 11),
124+
SLACK_BROKER_SERVER_PUBLIC_KEY: b64(32, 12),
125+
SLACK_BROKER_SERVER_SIGNING_PRIVATE_KEY: b64(32, 13),
126+
SLACK_BROKER_PUBLIC_KEY: b64(32, 14),
127+
SLACK_BROKER_SIGNING_PUBLIC_KEY: b64(32, 15),
128+
SLACK_BROKER_ACCESS_TOKEN: "test-broker-token",
129+
SLACK_ALLOWED_USERS: "U_ALLOWED",
130+
SLACK_BROKER_POLL_INTERVAL_MS: "100",
131+
BRIDGE_API_PORT: String(apiPort),
132+
},
133+
stdio: ["ignore", "pipe", "pipe"],
134+
});
135+
136+
bridge.stdout.on("data", (chunk) => {
137+
bridgeStdout += chunk.toString();
138+
});
139+
bridge.stderr.on("data", (chunk) => {
140+
bridgeStderr += chunk.toString();
141+
});
142+
143+
children.push(bridge);
144+
145+
await waitFor(
146+
() => bridgeStdout.includes("Outbound API listening"),
147+
10_000,
148+
50,
149+
`timeout waiting for startup log; stdout=${bridgeStdout}; stderr=${bridgeStderr}`,
150+
);
151+
152+
const allLogsResponse = await fetch(`http://127.0.0.1:${apiPort}/logs`);
153+
expect(allLogsResponse.status).toBe(200);
154+
expect(allLogsResponse.headers.get("content-type")).toContain("text/plain");
155+
const allLogsText = await allLogsResponse.text();
156+
expect(allLogsText).toContain("Outbound API listening");
157+
158+
const filteredLogsResponse = await fetch(`http://127.0.0.1:${apiPort}/logs?filter=outbound`);
159+
expect(filteredLogsResponse.status).toBe(200);
160+
const filteredLogsText = await filteredLogsResponse.text();
161+
expect(filteredLogsText.toLowerCase()).toContain("outbound api listening");
162+
163+
const limitedLogsResponse = await fetch(`http://127.0.0.1:${apiPort}/logs?n=1`);
164+
expect(limitedLogsResponse.status).toBe(200);
165+
const limitedLogsText = await limitedLogsResponse.text();
166+
const limitedLines = limitedLogsText.trim() ? limitedLogsText.trim().split("\n") : [];
167+
expect(limitedLines.length).toBeLessThanOrEqual(1);
168+
169+
const invalidNResponse = await fetch(`http://127.0.0.1:${apiPort}/logs?n=0`);
170+
expect(invalidNResponse.status).toBe(400);
171+
const invalidNBody = await invalidNResponse.json();
172+
expect(invalidNBody.error).toContain("positive integer");
173+
});
174+
73175
it("acks poison messages from broker to avoid infinite retry loops", async () => {
74176
await sodium.ready;
75177

0 commit comments

Comments
 (0)