Skip to content

Commit 17d8537

Browse files
fix(069-A2): count blocked attempts + wire observability hot-reload
Addresses CodexReviewer findings #2 and #3 on PR #560 (the O(1) fix cleared #1 once CI went green). #2 — blocked tool attempts were missing from the usage aggregate. They are persisted as blocked `policy_decision` records, but `Apply` dropped all non-tool_call records and `handlePolicyDecision` never fed the aggregate, so the contract's per-tool `blocked` field was permanently 0. `Apply` now also folds blocked policy_decisions: a blocked attempt never executed, so it increments only `Blocked` + `LastUsed` — not `Calls`, latency, bytes, or the executed-call timeline. `handlePolicyDecision` calls `usage.Apply` on save success so the live path matches a cold-start rebuild-from-scan. Extracted a `tool()` get-or-create helper. #3 — `observability.usage_persist_interval` claimed hot-reload but was only read at construction. `DetectConfigChanges` now flags an `observability` change and `ApplyConfig` pushes the new cadence into the running ActivityService via `SetUsagePersistInterval` (the flush loop already re-reads the interval each cycle). Test-first (ENG-1): aggregate counts blocked-only (not Calls/latency/ timeline); live `handlePolicyDecision` folds blocked into the snapshot; `DetectConfigChanges` detects observability as hot-reloadable; end-to-end `ApplyConfig` applies the new interval to a running runtime. Repointed the "ignores non-tool_calls" test to a non-blocked decision. Contract documents `blocked` semantics. Full internal/runtime+config+storage -race green; lint 0; personal+server builds. Related #560 Related MCP-835 Co-Authored-By: Paperclip <noreply@paperclip.ing>
1 parent 65540ab commit 17d8537

9 files changed

Lines changed: 196 additions & 21 deletions

File tree

internal/runtime/activity_service.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,15 @@ func (s *ActivityService) handlePolicyDecision(evt Event) {
362362
zap.Error(err),
363363
zap.String("server_name", serverName),
364364
zap.String("decision", decision))
365+
return
366+
}
367+
368+
// Fold blocked attempts into the usage aggregate (Spec 069 A2). Apply
369+
// ignores non-blocked decisions, so passing every policy decision is safe.
370+
// Done only on save success so the in-memory rollup stays consistent with a
371+
// cold-start rebuild that re-scans persisted records.
372+
if s.usage != nil {
373+
s.usage.Apply(record)
365374
}
366375
}
367376

internal/runtime/apply_config_restart_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"path/filepath"
66
"runtime"
77
"testing"
8+
"time"
89

910
"go.uber.org/zap"
1011

@@ -107,6 +108,41 @@ func TestApplyConfig_HotReloadableChange(t *testing.T) {
107108
assert.Equal(t, 20, savedCfg.ToolsLimit, "Config file should be updated with new ToolsLimit value")
108109
}
109110

111+
// TestApplyConfig_ObservabilityHotReload (MCP-835 / Codex finding #3): changing
112+
// the observability usage persist interval must hot-reload into the running
113+
// ActivityService — previously ApplyConfig only handled logging/truncator, so
114+
// SetUsagePersistInterval's "hot-reloadable" promise was unfulfilled.
115+
func TestApplyConfig_ObservabilityHotReload(t *testing.T) {
116+
tmpDir := t.TempDir()
117+
cfgPath := filepath.Join(tmpDir, "config.json")
118+
119+
initialCfg := config.DefaultConfig()
120+
initialCfg.Listen = "127.0.0.1:8080"
121+
initialCfg.DataDir = tmpDir
122+
require.NoError(t, config.SaveConfig(initialCfg, cfgPath))
123+
124+
rt, err := New(initialCfg, cfgPath, zap.NewNop())
125+
require.NoError(t, err)
126+
defer func() { _ = rt.Close() }()
127+
128+
// Default cadence is 30s before the reload.
129+
require.Equal(t, DefaultUsagePersistInterval, rt.ActivityService().usagePersistInterval())
130+
131+
newCfg := config.DefaultConfig()
132+
newCfg.Listen = "127.0.0.1:8080"
133+
newCfg.DataDir = tmpDir
134+
newCfg.Observability.UsagePersistInterval = config.Duration(10 * time.Second)
135+
136+
result, err := rt.ApplyConfig(newCfg, cfgPath)
137+
require.NoError(t, err)
138+
require.NotNil(t, result)
139+
140+
assert.False(t, result.RequiresRestart, "observability cadence change is hot-reloadable")
141+
assert.Contains(t, result.ChangedFields, "observability")
142+
assert.Equal(t, 10*time.Second, rt.ActivityService().usagePersistInterval(),
143+
"new persist interval must be applied to the running ActivityService")
144+
}
145+
110146
// TestApplyConfig_SaveFailure tests handling of save errors
111147
func TestApplyConfig_SaveFailure(t *testing.T) {
112148
// Skip on Windows: chmod on directories doesn't reliably prevent file creation

internal/runtime/config_hotreload.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,12 @@ func DetectConfigChanges(oldCfg, newCfg *config.Config) *ConfigApplyResult {
133133
result.ChangedFields = append(result.ChangedFields, "environment")
134134
}
135135

136+
// Observability cadence (Spec 069 A2 — can be hot-reloaded; the usage flush
137+
// loop re-reads the interval each cycle, so applying it is just a setter).
138+
if !reflect.DeepEqual(oldCfg.Observability, newCfg.Observability) {
139+
result.ChangedFields = append(result.ChangedFields, "observability")
140+
}
141+
136142
// If no changes detected
137143
if len(result.ChangedFields) == 0 {
138144
result.AppliedImmediately = false

internal/runtime/config_hotreload_test.go

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,32 @@ import (
99
"github.com/stretchr/testify/require"
1010
)
1111

12+
// TestDetectConfigChanges_Observability (MCP-835 / Codex finding #3): changing
13+
// the observability usage cadence must be detected as a hot-reloadable change so
14+
// ApplyConfig can push the new persist interval to the running ActivityService.
15+
// SetUsagePersistInterval advertises hot-reload; the detector must back it.
16+
func TestDetectConfigChanges_Observability(t *testing.T) {
17+
base := &config.Config{
18+
Listen: "127.0.0.1:8080", DataDir: "/d", TLS: &config.TLSConfig{},
19+
Observability: &config.ObservabilityConfig{
20+
UsageCacheTTL: config.Duration(5 * time.Second),
21+
UsagePersistInterval: config.Duration(30 * time.Second),
22+
},
23+
}
24+
changed := &config.Config{
25+
Listen: "127.0.0.1:8080", DataDir: "/d", TLS: &config.TLSConfig{},
26+
Observability: &config.ObservabilityConfig{
27+
UsageCacheTTL: config.Duration(5 * time.Second),
28+
UsagePersistInterval: config.Duration(10 * time.Second),
29+
},
30+
}
31+
32+
result := DetectConfigChanges(base, changed)
33+
require.True(t, result.Success)
34+
assert.Contains(t, result.ChangedFields, "observability")
35+
assert.False(t, result.RequiresRestart, "cadence change is hot-reloadable")
36+
}
37+
1238
func TestDetectConfigChanges(t *testing.T) {
1339
baseConfig := &config.Config{
1440
Listen: "127.0.0.1:8080",
@@ -49,7 +75,7 @@ func TestDetectConfigChanges(t *testing.T) {
4975
Listen: ":9090", // Changed
5076
DataDir: "/test/data",
5177
APIKey: "test-key",
52-
ToolsLimit: 15,
78+
ToolsLimit: 15,
5379
ToolResponseLimit: 1000,
5480
CallToolTimeout: config.Duration(60 * time.Second),
5581
Servers: []*config.ServerConfig{},
@@ -67,7 +93,7 @@ func TestDetectConfigChanges(t *testing.T) {
6793
Listen: "127.0.0.1:8080",
6894
DataDir: "/different/data", // Changed
6995
APIKey: "test-key",
70-
ToolsLimit: 15,
96+
ToolsLimit: 15,
7197
ToolResponseLimit: 1000,
7298
CallToolTimeout: config.Duration(60 * time.Second),
7399
Servers: []*config.ServerConfig{},
@@ -85,7 +111,7 @@ func TestDetectConfigChanges(t *testing.T) {
85111
Listen: "127.0.0.1:8080",
86112
DataDir: "/test/data",
87113
APIKey: "new-key", // Changed
88-
ToolsLimit: 15,
114+
ToolsLimit: 15,
89115
ToolResponseLimit: 1000,
90116
CallToolTimeout: config.Duration(60 * time.Second),
91117
Servers: []*config.ServerConfig{},
@@ -103,7 +129,7 @@ func TestDetectConfigChanges(t *testing.T) {
103129
Listen: "127.0.0.1:8080",
104130
DataDir: "/test/data",
105131
APIKey: "test-key",
106-
ToolsLimit: 15,
132+
ToolsLimit: 15,
107133
ToolResponseLimit: 1000,
108134
CallToolTimeout: config.Duration(60 * time.Second),
109135
Servers: []*config.ServerConfig{},
@@ -124,7 +150,7 @@ func TestDetectConfigChanges(t *testing.T) {
124150
Listen: "127.0.0.1:8080",
125151
DataDir: "/test/data",
126152
APIKey: "test-key",
127-
ToolsLimit: 20, // Changed
153+
ToolsLimit: 20, // Changed
128154
ToolResponseLimit: 1000,
129155
CallToolTimeout: config.Duration(60 * time.Second),
130156
Servers: []*config.ServerConfig{},
@@ -144,7 +170,7 @@ func TestDetectConfigChanges(t *testing.T) {
144170
Listen: "127.0.0.1:8080",
145171
DataDir: "/test/data",
146172
APIKey: "test-key",
147-
ToolsLimit: 15,
173+
ToolsLimit: 15,
148174
ToolResponseLimit: 1000,
149175
CallToolTimeout: config.Duration(60 * time.Second),
150176
Servers: []*config.ServerConfig{ // Changed

internal/runtime/runtime.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1267,6 +1267,15 @@ func (r *Runtime) ApplyConfig(newCfg *config.Config, cfgPath string) (*ConfigApp
12671267
r.truncator = truncate.NewTruncator(newCfg.ToolResponseLimit)
12681268
}
12691269

1270+
// Apply observability usage cadence (Spec 069 A2 — hot-reloadable). The
1271+
// usage flush loop re-reads the interval each cycle, so the setter suffices.
1272+
if contains(result.ChangedFields, "observability") && r.activityService != nil &&
1273+
newCfg.Observability != nil && newCfg.Observability.UsagePersistInterval.Duration() > 0 {
1274+
r.logger.Info("Observability usage persist interval changed",
1275+
zap.Duration("new_interval", newCfg.Observability.UsagePersistInterval.Duration()))
1276+
r.activityService.SetUsagePersistInterval(newCfg.Observability.UsagePersistInterval.Duration())
1277+
}
1278+
12701279
// Capture app context, config path, and config copy while we still hold the lock
12711280
appCtx := r.appCtx
12721281
cfgPathCopy := r.cfgPath

internal/runtime/usage_aggregate.go

Lines changed: 44 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -147,30 +147,47 @@ func newUsageAggregate() *UsageAggregate {
147147
}
148148
}
149149

150-
// Apply folds a single activity record into the aggregate. Non tool_call
151-
// records and records without a tool name are ignored. Apply is not safe for
152-
// concurrent use; it is called only by the owning goroutine (see UsageStore).
153-
func (a *UsageAggregate) Apply(rec *storage.ActivityRecord) {
154-
if rec == nil || rec.Type != storage.ActivityTypeToolCall || rec.ToolName == "" {
155-
return
156-
}
157-
158-
key := toolKey(rec.ServerName, rec.ToolName)
150+
// tool returns the per-(server,tool) rollup, creating it on first use. It also
151+
// defensively resizes a persisted snapshot from an older latency-bucket layout
152+
// rather than panicking on index.
153+
func (a *UsageAggregate) tool(server, toolName string) *ToolUsage {
154+
key := toolKey(server, toolName)
159155
tu := a.Tools[key]
160156
if tu == nil {
161157
tu = &ToolUsage{
162-
Server: rec.ServerName,
163-
Tool: rec.ToolName,
158+
Server: server,
159+
Tool: toolName,
164160
LatencyBuckets: make([]int64, numLatencyBuckets()),
165161
}
166162
a.Tools[key] = tu
167163
} else if len(tu.LatencyBuckets) != numLatencyBuckets() {
168-
// Defensive: a persisted snapshot from an older bucket layout is
169-
// resized rather than panicking on index.
170164
resized := make([]int64, numLatencyBuckets())
171165
copy(resized, tu.LatencyBuckets)
172166
tu.LatencyBuckets = resized
173167
}
168+
return tu
169+
}
170+
171+
// Apply folds a single activity record into the aggregate. It accepts executed
172+
// tool_calls and blocked policy_decisions (the form a policy-prevented tool
173+
// attempt is persisted as — MCP-835); all other records, and records without a
174+
// tool name, are ignored. Apply is not safe for concurrent use; it is called
175+
// only by the owning goroutine (see UsageStore).
176+
func (a *UsageAggregate) Apply(rec *storage.ActivityRecord) {
177+
if rec == nil || rec.ToolName == "" {
178+
return
179+
}
180+
switch {
181+
case rec.Type == storage.ActivityTypeToolCall:
182+
// folded below
183+
case rec.Type == storage.ActivityTypePolicyDecision && rec.Status == "blocked":
184+
a.applyBlocked(rec)
185+
return
186+
default:
187+
return
188+
}
189+
190+
tu := a.tool(rec.ServerName, rec.ToolName)
174191

175192
tu.Calls++
176193
switch rec.Status {
@@ -195,6 +212,20 @@ func (a *UsageAggregate) Apply(rec *storage.ActivityRecord) {
195212
a.applyTimeBucket(rec)
196213
}
197214

215+
// applyBlocked folds a policy-blocked attempt into the per-tool Blocked counter.
216+
// A blocked attempt never executed the tool, so it contributes no Calls,
217+
// latency, or bytes, and does not enter the executed-call timeline — it only
218+
// bumps Blocked and LastUsed. This keeps the contract's per-tool `blocked`
219+
// count non-zero (the field was previously dead) without polluting latency
220+
// percentiles or byte averages with non-executed attempts.
221+
func (a *UsageAggregate) applyBlocked(rec *storage.ActivityRecord) {
222+
tu := a.tool(rec.ServerName, rec.ToolName)
223+
tu.Blocked++
224+
if rec.Timestamp.After(tu.LastUsed) {
225+
tu.LastUsed = rec.Timestamp
226+
}
227+
}
228+
198229
func (a *UsageAggregate) applyTimeBucket(rec *storage.ActivityRecord) {
199230
start := rec.Timestamp.UTC().Truncate(usageBucketWidth)
200231
k := start.Unix()

internal/runtime/usage_aggregate_test.go

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,13 +56,42 @@ func TestUsageAggregate_Apply_IgnoresNonToolCalls(t *testing.T) {
5656
agg := newUsageAggregate()
5757
ts := time.Date(2026, 6, 1, 10, 0, 0, 0, time.UTC)
5858

59-
// Non tool_call records and empty tool names are ignored.
60-
agg.Apply(&storage.ActivityRecord{Type: storage.ActivityTypePolicyDecision, ServerName: "x", ToolName: "y", Status: "blocked", Timestamp: ts})
59+
// Non-blocked policy decisions and tool_calls with empty tool names are
60+
// ignored. (Blocked policy decisions ARE counted — see the test below.)
61+
agg.Apply(&storage.ActivityRecord{Type: storage.ActivityTypePolicyDecision, ServerName: "x", ToolName: "y", Status: "approved", Timestamp: ts})
6162
agg.Apply(&storage.ActivityRecord{Type: storage.ActivityTypeToolCall, ServerName: "x", ToolName: "", Status: "success", Timestamp: ts})
6263

6364
assert.Empty(t, agg.Tools)
6465
}
6566

67+
// TestUsageAggregate_Apply_CountsBlockedPolicyDecisions (MCP-835 / Codex
68+
// finding #2): blocked tool attempts are persisted as policy_decision records,
69+
// not tool_calls. The aggregate must still count them so the contract's
70+
// per-tool `blocked` field is non-zero. A blocked attempt never executed, so it
71+
// contributes ONLY to Blocked (and LastUsed) — not Calls, latency, bytes, or
72+
// the timeline (which tracks executed calls).
73+
func TestUsageAggregate_Apply_CountsBlockedPolicyDecisions(t *testing.T) {
74+
agg := newUsageAggregate()
75+
ts := time.Date(2026, 6, 1, 10, 0, 0, 0, time.UTC)
76+
77+
agg.Apply(&storage.ActivityRecord{Type: storage.ActivityTypePolicyDecision, ServerName: "github", ToolName: "search", Status: "blocked", Timestamp: ts})
78+
agg.Apply(&storage.ActivityRecord{Type: storage.ActivityTypePolicyDecision, ServerName: "github", ToolName: "search", Status: "blocked", Timestamp: ts.Add(time.Minute)})
79+
80+
tu := agg.Tools[toolKey("github", "search")]
81+
require.NotNil(t, tu, "blocked attempts must create a per-tool entry")
82+
assert.Equal(t, int64(2), tu.Blocked, "both blocked attempts counted")
83+
assert.Equal(t, int64(0), tu.Calls, "blocked attempts are not executed calls")
84+
assert.Equal(t, int64(0), tu.Errors)
85+
assert.Equal(t, ts.Add(time.Minute), tu.LastUsed, "LastUsed tracks the latest attempt")
86+
87+
var latencyTotal int64
88+
for _, c := range tu.LatencyBuckets {
89+
latencyTotal += c
90+
}
91+
assert.Equal(t, int64(0), latencyTotal, "blocked attempts have no latency sample")
92+
assert.Empty(t, agg.Buckets, "blocked attempts do not enter the executed-call timeline")
93+
}
94+
6695
func TestToolUsage_Averages_ExcludeZeroByteCalls(t *testing.T) {
6796
agg := newUsageAggregate()
6897
ts := time.Date(2026, 6, 1, 10, 0, 0, 0, time.UTC)

internal/runtime/usage_service_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,34 @@ func TestActivityService_HandleEvent_AppliesToolCallToUsage(t *testing.T) {
132132
assert.Equal(t, int64(128), tu.ReqBytesSum)
133133
}
134134

135+
// TestActivityService_HandleEvent_CountsBlockedPolicyDecision (MCP-835 / Codex
136+
// finding #2): a blocked tool attempt is emitted as a policy_decision event,
137+
// not a tool_call. The live path must fold it into the usage aggregate so the
138+
// per-tool `blocked` count is non-zero — matching what a cold-start scan would
139+
// rebuild from the persisted record.
140+
func TestActivityService_HandleEvent_CountsBlockedPolicyDecision(t *testing.T) {
141+
svc, _ := newUsageTestService(t)
142+
evt := Event{
143+
Type: EventTypeActivityPolicyDecision,
144+
Timestamp: time.Date(2026, 6, 1, 10, 0, 0, 0, time.UTC),
145+
Payload: map[string]any{
146+
"server_name": "github",
147+
"tool_name": "search",
148+
"decision": "blocked",
149+
"reason": "Server is quarantined for security review",
150+
},
151+
}
152+
153+
svc.handleEvent(evt)
154+
155+
snap := svc.UsageSnapshot()
156+
require.NotNil(t, snap)
157+
tu := snap.Tools[toolKey("github", "search")]
158+
require.NotNil(t, tu, "blocked policy decision must reach the usage aggregate")
159+
assert.Equal(t, int64(1), tu.Blocked)
160+
assert.Equal(t, int64(0), tu.Calls, "blocked attempt is not an executed call")
161+
}
162+
135163
func TestActivityService_SetUsagePersistInterval_HotReload(t *testing.T) {
136164
svc, _ := newUsageTestService(t)
137165
assert.Equal(t, DefaultUsagePersistInterval, svc.usagePersistInterval())

specs/069-observability-usage-graphs/contracts/usage-endpoint.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ Auth: `X-API-Key` (REST default).
5555
- `token_source: "bytes"` labels the size-based proxy (FR-006); FR-010 will switch this to `"estimated_tokens"`.
5656
- `tokens_saved*` echoed from existing `ServerTokenMetrics` (FR-007 / SC-008).
5757
- `avg_*` computed over `sized_calls` only (records with `0` bytes excluded); `null`/omitted when `sized_calls == 0`.
58+
- `blocked` counts policy-prevented attempts (persisted as blocked `policy_decision` records, not executed tool_calls). A blocked attempt never ran, so it is **not** included in `calls` and contributes no latency/bytes — it only increments `blocked` and `last_used`. The timeline tracks executed calls and excludes blocked attempts.
5859
- `other` present only when the tool list was truncated to `top`.
5960
- Empty log → `tools: []`, `timeline: []`, `tokens_saved` from metrics (or 0) — never an error (FR-009 / SC-007).
6061

0 commit comments

Comments
 (0)