Skip to content

Commit 642ab97

Browse files
committed
ops: add broker poll/inbound/outbound health reporting
1 parent 6227ee1 commit 642ab97

4 files changed

Lines changed: 241 additions & 27 deletions

File tree

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ sudo baudbot deploy
6868
# start the service
6969
sudo baudbot start
7070

71-
# check health (includes deployed version + broker connection status)
71+
# check health (includes deployed version + broker connection/health status)
7272
sudo baudbot status
7373
sudo baudbot doctor
7474
```

bin/baudbot

Lines changed: 89 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -335,8 +335,10 @@ broker_mode_configured() {
335335

336336
print_broker_connection_status() {
337337
local agent_user="${BAUDBOT_AGENT_USER:-baudbot_agent}"
338-
local pane=""
339-
local latest_signal=""
338+
local health_file="/home/$agent_user/.pi/agent/broker-health.json"
339+
local health_summary=""
340+
local connection_state=""
341+
local components_line=""
340342

341343
if ! broker_mode_configured "$agent_user"; then
342344
echo -e "${BOLD}broker connection:${RESET} not configured"
@@ -348,33 +350,108 @@ print_broker_connection_status() {
348350
echo -e "${BOLD}broker connection:${RESET} disconnected (bridge tmux session not running)"
349351
return 0
350352
}
351-
pane="$(sudo -u "$agent_user" tmux capture-pane -p -t slack-bridge -S -200 2>/dev/null || true)"
352353
elif [ "$(id -un)" = "$agent_user" ]; then
353354
tmux has-session -t slack-bridge 2>/dev/null || {
354355
echo -e "${BOLD}broker connection:${RESET} disconnected (bridge tmux session not running)"
355356
return 0
356357
}
357-
pane="$(tmux capture-pane -p -t slack-bridge -S -200 2>/dev/null || true)"
358358
else
359359
echo -e "${BOLD}broker connection:${RESET} configured (run with sudo for runtime status)"
360360
return 0
361361
fi
362362

363-
latest_signal="$(printf '%s\n' "$pane" | grep -E 'inbox poll failed|backing off|idle|pulled [0-9]+ message\(s\)|Slack broker pull bridge is running' | tail -1 || true)"
364-
case "$latest_signal" in
365-
*"inbox poll failed"*|*"backing off"*)
366-
echo -e "${BOLD}broker connection:${RESET} reconnecting (recent inbox poll failure)"
367-
;;
368-
*"idle"*|*"pulled "*)
363+
if [ ! -r "$health_file" ]; then
364+
echo -e "${BOLD}broker connection:${RESET} starting"
365+
echo -e "${BOLD}broker health:${RESET} unavailable (waiting for bridge health file)"
366+
return 0
367+
fi
368+
369+
health_summary="$(python3 - "$health_file" <<'PY'
370+
import json
371+
import sys
372+
from datetime import datetime, timezone
373+
374+
path = sys.argv[1]
375+
with open(path, 'r', encoding='utf-8') as f:
376+
h = json.load(f)
377+
378+
def parse_iso(s):
379+
if not s:
380+
return None
381+
try:
382+
if s.endswith('Z'):
383+
s = s[:-1] + '+00:00'
384+
dt = datetime.fromisoformat(s)
385+
if dt.tzinfo is None:
386+
dt = dt.replace(tzinfo=timezone.utc)
387+
return dt
388+
except Exception:
389+
return None
390+
391+
def age_seconds(ts):
392+
dt = parse_iso(ts)
393+
if not dt:
394+
return None
395+
return (datetime.now(timezone.utc) - dt).total_seconds()
396+
397+
def status(ok_ts, err_ts):
398+
ok_dt = parse_iso(ok_ts)
399+
err_dt = parse_iso(err_ts)
400+
if err_dt and (not ok_dt or err_dt >= ok_dt):
401+
return 'error'
402+
if ok_dt:
403+
return 'ok'
404+
return 'unknown'
405+
406+
poll = h.get('poll', {})
407+
inbound = h.get('inbound', {})
408+
ack = h.get('ack', {})
409+
outbound = h.get('outbound', {})
410+
411+
poll_age = age_seconds(poll.get('last_ok_at'))
412+
poll_failures = int(poll.get('consecutive_failures') or 0)
413+
poll_state = status(poll.get('last_ok_at'), poll.get('last_error_at'))
414+
415+
if poll_state == 'error' and poll_failures > 0:
416+
connection = 'reconnecting'
417+
elif poll_age is not None and poll_age <= 120:
418+
connection = 'connected'
419+
elif poll_age is not None:
420+
connection = 'stale'
421+
else:
422+
connection = 'starting'
423+
424+
inbound_state = status(inbound.get('last_process_ok_at'), inbound.get('last_process_error_at'))
425+
ack_state = status(ack.get('last_ok_at'), ack.get('last_error_at'))
426+
outbound_state = status(outbound.get('last_ok_at'), outbound.get('last_error_at'))
427+
428+
print(connection)
429+
print(f'poll={poll_state} inbound={inbound_state} ack={ack_state} outbound={outbound_state}')
430+
PY
431+
)"
432+
433+
connection_state="$(printf '%s\n' "$health_summary" | sed -n '1p')"
434+
components_line="$(printf '%s\n' "$health_summary" | sed -n '2p')"
435+
436+
case "$connection_state" in
437+
connected)
369438
echo -e "${BOLD}broker connection:${RESET} connected"
370439
;;
371-
*"Slack broker pull bridge is running"*)
440+
reconnecting)
441+
echo -e "${BOLD}broker connection:${RESET} reconnecting"
442+
;;
443+
stale)
444+
echo -e "${BOLD}broker connection:${RESET} stale (no recent successful poll)"
445+
;;
446+
starting)
372447
echo -e "${BOLD}broker connection:${RESET} starting"
373448
;;
374449
*)
375-
echo -e "${BOLD}broker connection:${RESET} unknown (bridge running, no recent poll telemetry)"
450+
echo -e "${BOLD}broker connection:${RESET} unknown"
376451
;;
377452
esac
453+
454+
[ -n "$components_line" ] && echo -e "${BOLD}broker health:${RESET} $components_line"
378455
}
379456

380457
pi_control_dir() {

docs/operations.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ sudo baudbot start
1010
sudo baudbot stop
1111
sudo baudbot restart
1212

13-
# Status and logs (status includes deployed version + broker connection state)
13+
# Status and logs (status includes deployed version + broker connection/health state)
1414
sudo baudbot status
1515
sudo baudbot logs
1616

slack-bridge/broker-bridge.mjs

Lines changed: 150 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ const POLL_INTERVAL_MS = parseInt(process.env.SLACK_BROKER_POLL_INTERVAL_MS || "
3636
const MAX_MESSAGES = parseInt(process.env.SLACK_BROKER_MAX_MESSAGES || "10", 10);
3737
const DEDUPE_TTL_MS = parseInt(process.env.SLACK_BROKER_DEDUPE_TTL_MS || String(20 * 60 * 1000), 10);
3838
const MAX_BACKOFF_MS = 30_000;
39+
const BROKER_HEALTH_PATH = path.join(homedir(), ".pi", "agent", "broker-health.json");
3940

4041
function ts() {
4142
return new Date().toISOString();
@@ -95,6 +96,114 @@ let cryptoState = null;
9596

9697
const dedupe = new Map();
9798

99+
const brokerHealth = {
100+
started_at: new Date().toISOString(),
101+
updated_at: new Date().toISOString(),
102+
outbound_mode: outboundMode,
103+
broker_url: brokerBaseUrl,
104+
workspace_id: workspaceId,
105+
poll: {
106+
last_ok_at: null,
107+
last_error_at: null,
108+
consecutive_failures: 0,
109+
last_error: null,
110+
},
111+
inbound: {
112+
last_decrypt_ok_at: null,
113+
last_decrypt_error_at: null,
114+
last_process_ok_at: null,
115+
last_process_error_at: null,
116+
last_error: null,
117+
},
118+
ack: {
119+
last_ok_at: null,
120+
last_error_at: null,
121+
last_error: null,
122+
},
123+
outbound: {
124+
last_ok_at: null,
125+
last_error_at: null,
126+
last_error: null,
127+
},
128+
};
129+
130+
function trimError(err) {
131+
const msg = err instanceof Error ? err.message : String(err || "unknown error");
132+
return msg.slice(0, 400);
133+
}
134+
135+
function persistBrokerHealth() {
136+
brokerHealth.updated_at = new Date().toISOString();
137+
const dir = path.dirname(BROKER_HEALTH_PATH);
138+
const tmp = `${BROKER_HEALTH_PATH}.tmp`;
139+
fs.mkdirSync(dir, { recursive: true });
140+
fs.writeFileSync(tmp, `${JSON.stringify(brokerHealth, null, 2)}\n`, { mode: 0o600 });
141+
fs.renameSync(tmp, BROKER_HEALTH_PATH);
142+
}
143+
144+
function markHealth(section, ok, err = null) {
145+
const now = new Date().toISOString();
146+
147+
if (section === "poll") {
148+
if (ok) {
149+
brokerHealth.poll.last_ok_at = now;
150+
brokerHealth.poll.consecutive_failures = 0;
151+
brokerHealth.poll.last_error = null;
152+
} else {
153+
brokerHealth.poll.last_error_at = now;
154+
brokerHealth.poll.consecutive_failures += 1;
155+
brokerHealth.poll.last_error = trimError(err);
156+
}
157+
persistBrokerHealth();
158+
return;
159+
}
160+
161+
if (section === "inbound_decrypt") {
162+
if (ok) {
163+
brokerHealth.inbound.last_decrypt_ok_at = now;
164+
} else {
165+
brokerHealth.inbound.last_decrypt_error_at = now;
166+
brokerHealth.inbound.last_error = trimError(err);
167+
}
168+
persistBrokerHealth();
169+
return;
170+
}
171+
172+
if (section === "inbound_process") {
173+
if (ok) {
174+
brokerHealth.inbound.last_process_ok_at = now;
175+
} else {
176+
brokerHealth.inbound.last_process_error_at = now;
177+
brokerHealth.inbound.last_error = trimError(err);
178+
}
179+
persistBrokerHealth();
180+
return;
181+
}
182+
183+
if (section === "ack") {
184+
if (ok) {
185+
brokerHealth.ack.last_ok_at = now;
186+
brokerHealth.ack.last_error = null;
187+
} else {
188+
brokerHealth.ack.last_error_at = now;
189+
brokerHealth.ack.last_error = trimError(err);
190+
}
191+
persistBrokerHealth();
192+
return;
193+
}
194+
195+
if (section === "outbound") {
196+
if (ok) {
197+
brokerHealth.outbound.last_ok_at = now;
198+
brokerHealth.outbound.last_error = null;
199+
} else {
200+
brokerHealth.outbound.last_error_at = now;
201+
brokerHealth.outbound.last_error = trimError(err);
202+
}
203+
persistBrokerHealth();
204+
}
205+
}
206+
98207
function toBase64(bytes) {
99208
return Buffer.from(bytes).toString("base64");
100209
}
@@ -329,15 +438,22 @@ async function sendViaBroker({ action, routing, body }) {
329438
const sig = sodium.crypto_sign_detached(canonical, cryptoState.serverSignSecretKey);
330439
const signature = toBase64(sig);
331440

332-
return brokerFetch("/api/send", {
333-
workspace_id: workspaceId,
334-
action,
335-
routing,
336-
encrypted_body: encryptedBody,
337-
nonce: nonceB64,
338-
timestamp,
339-
signature,
340-
});
441+
try {
442+
const result = await brokerFetch("/api/send", {
443+
workspace_id: workspaceId,
444+
action,
445+
routing,
446+
encrypted_body: encryptedBody,
447+
nonce: nonceB64,
448+
timestamp,
449+
signature,
450+
});
451+
markHealth("outbound", true);
452+
return result;
453+
} catch (err) {
454+
markHealth("outbound", false, err);
455+
throw err;
456+
}
341457
}
342458

343459
/**
@@ -386,11 +502,13 @@ async function sendDirectToSlack(apiMethod, params) {
386502
const error = data.error || response.statusText;
387503
throw new Error(`Slack API ${apiMethod} failed: ${sanitizeError(error)}`);
388504
}
389-
505+
506+
markHealth("outbound", true);
390507
return data;
391508
} catch (err) {
392509
// Sanitize any error messages to prevent token leakage
393510
const sanitizedMessage = sanitizeError(err.message || String(err));
511+
markHealth("outbound", false, sanitizedMessage);
394512
throw new Error(sanitizedMessage);
395513
}
396514
}
@@ -519,7 +637,15 @@ async function processPulledMessage(message) {
519637
throw new Error("invalid broker envelope signature");
520638
}
521639

522-
const payload = decryptEnvelope(message);
640+
let payload;
641+
try {
642+
payload = decryptEnvelope(message);
643+
markHealth("inbound_decrypt", true);
644+
} catch (err) {
645+
markHealth("inbound_decrypt", false, err);
646+
throw err;
647+
}
648+
523649
logInfo(`📦 decrypted envelope — type: ${payload?.type || "unknown"}`);
524650

525651
if (payload?.type !== "event_callback") {
@@ -723,6 +849,7 @@ async function startPollLoop() {
723849
pruneDedupe();
724850

725851
const messages = await pullInbox();
852+
markHealth("poll", true);
726853
pollCount++;
727854
const ackIds = [];
728855

@@ -756,13 +883,15 @@ async function startPollLoop() {
756883
logInfo(`📩 processing message ${message.message_id}`);
757884
const ok = await processPulledMessage(message);
758885
if (ok) {
886+
markHealth("inbound_process", true);
759887
dedupe.set(message.message_id, Date.now() + DEDUPE_TTL_MS);
760888
ackIds.push(message.message_id);
761889
logInfo(`✅ processed & acked message ${message.message_id}`);
762890
} else {
763891
logWarn(`⚠️ message ${message.message_id} returned not-ok, will retry next poll`);
764892
}
765893
} catch (err) {
894+
markHealth("inbound_process", false, err);
766895
const errMsg = err instanceof Error ? err.message : "unknown error";
767896
const errStack = err instanceof Error ? err.stack : "";
768897
logError(`❌ message processing failed (${message.message_id}): ${errMsg}`);
@@ -777,13 +906,20 @@ async function startPollLoop() {
777906
}
778907

779908
if (ackIds.length > 0) {
780-
await ackInbox(ackIds);
781-
logInfo(`📤 acked ${ackIds.length} message(s)`);
909+
try {
910+
await ackInbox(ackIds);
911+
markHealth("ack", true);
912+
logInfo(`📤 acked ${ackIds.length} message(s)`);
913+
} catch (err) {
914+
markHealth("ack", false, err);
915+
throw err;
916+
}
782917
}
783918

784919
backoffMs = POLL_INTERVAL_MS;
785920
await sleep(POLL_INTERVAL_MS);
786921
} catch (err) {
922+
markHealth("poll", false, err);
787923
const errMsg = err instanceof Error ? err.message : "unknown error";
788924
const errStack = err instanceof Error ? err.stack : "";
789925
logError(`❌ inbox poll failed: ${errMsg}`);
@@ -811,6 +947,7 @@ async function startPollLoop() {
811947

812948
refreshSocket();
813949
startApiServer();
950+
persistBrokerHealth();
814951
logInfo("⚡ Slack broker pull bridge is running!");
815952
logInfo(` outbound mode: ${outboundMode} ${outboundMode === "direct" ? "(using SLACK_BOT_TOKEN)" : "(via broker)"}`);
816953
logInfo(` broker: ${brokerBaseUrl}`);

0 commit comments

Comments
 (0)