Skip to content

Commit f1dacab

Browse files
authored
feat(observability): add runtime-state gauges for SSE, circuits, list-cache (#4177)
Three subsystems hold meaningful per-pod runtime state but emitted no metric for it. Add OpenTelemetry observable gauges (sampled at scrape time, exported via the existing Prometheus reader on /metrics): - sse_hub.connections — active SSE listeners (bounded by MAX_TOTAL_CONNECTIONS=500; chart vs the cap, catch cleanup leaks) - mcp_circuit_breaker.circuits{state=open|half_open} — MCP connection circuits by state; a rising `open` count = downstream servers unreachable / requests failing fast - mcp_list_cache.pending_revalidations — in-flight background list revalidations; complements the existing fetches counter All are thin reads over already-held state — no new tracking, no hot-path cost beyond an O(n≤cap) scan per scrape for the circuit gauge.
1 parent 77bfba4 commit f1dacab

3 files changed

Lines changed: 44 additions & 0 deletions

File tree

apps/mesh/src/event-bus/sse-hub.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
* - Pluggable broadcast: strategy handles cross-process replication
1818
*/
1919

20+
import { meter } from "../observability";
21+
2022
// ============================================================================
2123
// Types
2224
// ============================================================================
@@ -234,3 +236,13 @@ function matchesAnyPattern(eventType: string, patterns: string[]): boolean {
234236

235237
/** Global SSE hub instance */
236238
export const sseHub = new SSEHub();
239+
240+
// Live SSE connection count (sampled at scrape time). Bounded by
241+
// MAX_TOTAL_CONNECTIONS — chart against that cap to spot saturation, and watch
242+
// for a count that never drops (a listener-cleanup leak).
243+
meter
244+
.createObservableGauge("sse_hub.connections", {
245+
description: "Active SSE listeners held by this pod",
246+
unit: "{connections}",
247+
})
248+
.addCallback((r) => r.observe(sseHub.count));

apps/mesh/src/mcp-clients/circuit-breaker.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import {
1515
CIRCUIT_BREAKER_FAILURE_THRESHOLD,
1616
CIRCUIT_BREAKER_MAX_ENTRIES,
1717
} from "../core/constants";
18+
import { meter } from "../observability";
1819

1920
type CircuitState = "CLOSED" | "OPEN" | "HALF_OPEN";
2021

@@ -27,6 +28,26 @@ interface CircuitEntry {
2728

2829
const circuits = new Map<string, CircuitEntry>();
2930

31+
// Live circuit counts by state (sampled at scrape time). A rising `open` count
32+
// means downstream MCP servers are unreachable and requests are failing fast —
33+
// the signal an operator wants when proxied tools start erroring. O(n) over the
34+
// map (n ≤ CIRCUIT_BREAKER_MAX_ENTRIES) per scrape.
35+
meter
36+
.createObservableGauge("mcp_circuit_breaker.circuits", {
37+
description: "MCP connection circuits by state (open, half_open)",
38+
unit: "{circuits}",
39+
})
40+
.addCallback((r) => {
41+
let open = 0;
42+
let halfOpen = 0;
43+
for (const c of circuits.values()) {
44+
if (c.state === "OPEN") open++;
45+
else if (c.state === "HALF_OPEN") halfOpen++;
46+
}
47+
r.observe(open, { state: "open" });
48+
r.observe(halfOpen, { state: "half_open" });
49+
});
50+
3051
export class CircuitOpenError extends Error {
3152
readonly cooldownRemainingMs: number;
3253
constructor(connectionId: string, cooldownRemainingMs: number) {

apps/mesh/src/mcp-clients/mcp-list-cache.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,17 @@ export class JetStreamKVMcpListCache implements McpListCache {
9797
// Module-level revalidation tracking (prevents thundering herd)
9898
const revalidating = new Set<string>();
9999

100+
// In-flight background list revalidations (sampled at scrape time). Sits
101+
// alongside the mcp_list_cache.fetches counter: a count that stays high means
102+
// upstream list calls aren't keeping up with staleness — refresh load worth
103+
// watching next to hit/miss/stale rates.
104+
meter
105+
.createObservableGauge("mcp_list_cache.pending_revalidations", {
106+
description: "In-flight background MCP list revalidations on this pod",
107+
unit: "{revalidations}",
108+
})
109+
.addCallback((r) => r.observe(revalidating.size));
110+
100111
// Per-(type,connection) timestamp of the last revalidation we scheduled, used to
101112
// throttle background revalidation. Without it, every cache hit reconnects
102113
// downstream — so a UI polling tools/list every few seconds hammers the upstream

0 commit comments

Comments
 (0)