Skip to content

Commit 57ccddb

Browse files
authored
bridge: include inbox.pull observability meta (#149)
1 parent c8acc8b commit 57ccddb

4 files changed

Lines changed: 86 additions & 4 deletions

File tree

.env.schema

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,11 @@ SLACK_BROKER_ACCESS_TOKEN_EXPIRES_AT=
148148
# @sensitive=false @type=string
149149
SLACK_BROKER_ACCESS_TOKEN_SCOPES=
150150

151+
# Optional agent version override used in broker observability metadata.
152+
# If unset, broker bridge falls back to ~/.pi/agent/baudbot-version.json (if present).
153+
# @sensitive=false @type=string
154+
BAUDBOT_AGENT_VERSION=
155+
151156
# Broker pull cadence in milliseconds (default: 3000)
152157
# @sensitive=false @type=number
153158
SLACK_BROKER_POLL_INTERVAL_MS=3000

CONFIGURATION.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ Set by `sudo baudbot broker register` when using brokered Slack OAuth flow.
111111
| `SLACK_BROKER_MAX_MESSAGES` | Max leased messages per poll request (default: `10`) |
112112
| `SLACK_BROKER_WAIT_SECONDS` | Long-poll wait window for `/api/inbox/pull` (default: `20`, set `0` for immediate short-poll, max `25`) |
113113
| `SLACK_BROKER_DEDUPE_TTL_MS` | Dedupe cache TTL in milliseconds (default: `1200000`) |
114+
| `BAUDBOT_AGENT_VERSION` | Optional override for broker observability `meta.agent_version` (otherwise read from `~/.pi/agent/baudbot-version.json` when available) |
114115

115116
### Kernel (Cloud Browsers)
116117

slack-bridge/broker-bridge.mjs

Lines changed: 68 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import * as net from "node:net";
1010
import * as fs from "node:fs";
1111
import * as path from "node:path";
12-
import { homedir } from "node:os";
12+
import { homedir, uptime as getSystemUptimeSeconds } from "node:os";
1313
import { createServer } from "node:http";
1414
import sodium from "libsodium-wrappers-sumo";
1515
import {
@@ -52,6 +52,7 @@ const DEDUPE_TTL_MS = clampInt(
5252
const MAX_BACKOFF_MS = 30_000;
5353
const INBOX_PROTOCOL_VERSION = "2026-02-1";
5454
const BROKER_HEALTH_PATH = path.join(homedir(), ".pi", "agent", "broker-health.json");
55+
const BAUDBOT_VERSION_PATH = path.join(homedir(), ".pi", "agent", "baudbot-version.json");
5556
const LOG_BUFFER_MAX_LINES = 1000;
5657

5758
const logLineBuffer = [];
@@ -153,6 +154,8 @@ let cryptoState = null;
153154

154155
const dedupe = new Map();
155156
let brokerTokenExpiryFormatWarned = false;
157+
let brokerPollCount = 0;
158+
const bridgeStartedAtMs = Date.now();
156159

157160
const brokerHealth = {
158161
started_at: new Date().toISOString(),
@@ -185,6 +188,67 @@ const brokerHealth = {
185188
},
186189
};
187190

191+
function readAgentVersion() {
192+
const explicitVersion = String(process.env.BAUDBOT_AGENT_VERSION || "").trim();
193+
if (explicitVersion) return explicitVersion;
194+
195+
try {
196+
const raw = fs.readFileSync(BAUDBOT_VERSION_PATH, "utf8");
197+
const parsed = JSON.parse(raw);
198+
if (typeof parsed.short === "string" && parsed.short.trim()) return parsed.short.trim();
199+
if (typeof parsed.sha === "string" && parsed.sha.trim()) return parsed.sha.trim();
200+
} catch {
201+
// Ignore read/parse failures. Observability metadata falls back to "unknown".
202+
}
203+
204+
return "unknown";
205+
}
206+
207+
const agentVersion = readAgentVersion();
208+
209+
function countActivePiSessions() {
210+
try {
211+
const entries = fs.readdirSync(SOCKET_DIR, { withFileTypes: true });
212+
let activeSessions = 0;
213+
let activeDevAgents = 0;
214+
215+
for (const entry of entries) {
216+
const name = entry.name;
217+
if (name.endsWith(".sock")) {
218+
activeSessions += 1;
219+
}
220+
if (/^dev-agent-.+\.alias$/.test(name) && (entry.isFile() || entry.isSymbolicLink())) {
221+
activeDevAgents += 1;
222+
}
223+
}
224+
225+
return { activeSessions, activeDevAgents };
226+
} catch {
227+
return { activeSessions: 0, activeDevAgents: 0 };
228+
}
229+
}
230+
231+
function buildPullMeta(maxMessages, waitSeconds) {
232+
const { activeSessions, activeDevAgents } = countActivePiSessions();
233+
const bridgeUptimeHours = Math.max(0, (Date.now() - bridgeStartedAtMs) / (1000 * 60 * 60));
234+
const systemUptimeHours = Math.max(0, getSystemUptimeSeconds() / (60 * 60));
235+
236+
return {
237+
agent_version: agentVersion,
238+
bridge_uptime_hours: bridgeUptimeHours,
239+
system_uptime_hours: systemUptimeHours,
240+
heartbeat_runs: brokerPollCount,
241+
heartbeat_consecutive_errors: brokerHealth.poll.consecutive_failures,
242+
heartbeat_last_ok_at: brokerHealth.poll.last_ok_at,
243+
active_sessions: activeSessions,
244+
active_dev_agents: activeDevAgents,
245+
outbound_mode: outboundMode,
246+
poll_count: brokerPollCount + 1,
247+
max_messages: maxMessages,
248+
wait_seconds: waitSeconds,
249+
};
250+
}
251+
188252
function trimError(err) {
189253
const msg = err instanceof Error ? err.message : String(err || "unknown error");
190254
return msg.slice(0, 400);
@@ -496,6 +560,7 @@ async function pullInbox() {
496560
wait_seconds: BROKER_WAIT_SECONDS,
497561
timestamp,
498562
signature,
563+
meta: buildPullMeta(MAX_MESSAGES, BROKER_WAIT_SECONDS),
499564
};
500565

501566
const inboxPullResponseBody = await brokerFetch("/api/inbox/pull", inboxPullRequestBody);
@@ -941,7 +1006,6 @@ function startApiServer() {
9411006

9421007
async function startPollLoop() {
9431008
let backoffMs = POLL_INTERVAL_MS;
944-
let pollCount = 0;
9451009
let lastStatusLog = Date.now();
9461010
const STATUS_LOG_INTERVAL_MS = 60_000; // log a status line every 60s even when idle
9471011

@@ -953,7 +1017,7 @@ async function startPollLoop() {
9531017
const messages = await pullInbox();
9541018
pollSucceeded = true;
9551019
markHealth("poll", true);
956-
pollCount++;
1020+
brokerPollCount++;
9571021
const ackIds = [];
9581022

9591023
if (messages.length > 0) {
@@ -962,7 +1026,7 @@ async function startPollLoop() {
9621026

9631027
// Periodic idle status log so you know the bridge is alive
9641028
if (messages.length === 0 && Date.now() - lastStatusLog >= STATUS_LOG_INTERVAL_MS) {
965-
logInfo(`💤 idle — ${pollCount} polls since start, dedupe cache: ${dedupe.size} entries`);
1029+
logInfo(`💤 idle — ${brokerPollCount} polls since start, dedupe cache: ${dedupe.size} entries`);
9661030
lastStatusLog = Date.now();
9671031
}
9681032

test/broker-bridge.integration.test.mjs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -569,6 +569,18 @@ describe("broker pull bridge semi-integration", () => {
569569
expect(pullPayload.protocol_version).toBe("2026-02-1");
570570
expect(pullPayload.max_messages).toBe(10);
571571
expect(pullPayload.wait_seconds).toBe(20);
572+
expect(pullPayload.meta).toBeTruthy();
573+
expect(pullPayload.meta.outbound_mode).toBe("broker");
574+
expect(pullPayload.meta.poll_count).toBeGreaterThanOrEqual(1);
575+
expect(pullPayload.meta.max_messages).toBe(10);
576+
expect(pullPayload.meta.wait_seconds).toBe(20);
577+
expect(typeof pullPayload.meta.bridge_uptime_hours).toBe("number");
578+
expect(typeof pullPayload.meta.system_uptime_hours).toBe("number");
579+
expect(typeof pullPayload.meta.active_sessions).toBe("number");
580+
expect(typeof pullPayload.meta.active_dev_agents).toBe("number");
581+
expect(typeof pullPayload.meta.agent_version).toBe("string");
582+
expect(pullPayload.meta.heartbeat_runs).toBeGreaterThanOrEqual(0);
583+
expect(pullPayload.meta.heartbeat_consecutive_errors).toBeGreaterThanOrEqual(0);
572584

573585
const canonical = canonicalizeProtocolRequest(workspaceId, "2026-02-1", "inbox.pull", pullPayload.timestamp, {
574586
max_messages: 10,

0 commit comments

Comments
 (0)