Skip to content

Commit eb59ca5

Browse files
kjgbotkjgbotclaude
authored
fix(writeback): service-side delete-storm breaker + boot staleness gate (#249) (#251)
* test(writeback): pin service-side delete-storm breaker contract (#249) Failing tests first, per the in-run incident design (ratified in #249): the 2026-06-06 storm destroyed 149+ canonical records via ~186 admitted file_delete ops; the binary guard refuses false diffs client-side, this breaker is the layer that survives future client bugs. - write-time: DeleteFile refuses admissions beyond DeleteStormThreshold within DeleteStormWindow (ErrDeleteStormRejected, 429 at the API; the FS MUTATION is refused — the incident destroyed records even though every provider call failed) - boot-time: the construction scan re-arms every pending/running op at every restart (a deploy is a restart); a rehydrated pending delete burst above threshold is quarantined (outside the boot allowlist, invisible to /writeback/pending, unreplayable) instead of re-armed - disabled unless DeleteStormThreshold > 0; window latches while a storm is ongoing and clears when it goes quiet API surface (options, sentinel, status, handler mapping) lands with stub bodies so the red state is 4 meaningful test failures, not a build failure. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * fix(writeback): implement service-side delete-storm breaker (#249) Sliding-window admission control in DeleteFile (per-workspace, latching) plus boot-scan quarantine of rehydrated pending delete bursts, exactly as pinned by the red commit. Quarantined ops are terminal by construction: the boot allowlist (only pending/running re-enqueue) skips them, GetPendingWritebacks filters them, and ReplayOperation refuses anything not dead_lettered. Open review question (flagged in-code): fork-commit deletes (CommitFork → recordWriteLocked) do not share the window; the storm came through the direct DeleteFile path. Decide in review whether to extend. Closes #249 (service-side layer; binary-side guard tracked in the same issue) Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * test(writeback): pin boot staleness gate for stuck-running ops (#249) The freeze closure surfaced op_20440: a stuck-running UPSERT of a draft at a canonical message path — a chat.postMessage duplicate armed for the next restart, a class the delete-scoped breaker misses. Generalized criterion from the incident: any non-terminal op is armed, regardless of verb. running-at-boot is definitionally abandoned (the executor died in a previous process life); stale ones must quarantine instead of re-arm, while fresh ones keep at-least-once crash semantics. Red: StaleRunningOpThreshold option exists, gate not implemented. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * fix(writeback): boot staleness gate quarantines stuck-running ops (#249) Verb-agnostic: an op still 'running' at boot executed in a previous process life — its executor is dead. Older than StaleRunningOpThreshold = abandoned → quarantined (op_20440 class: a stuck-running upsert is an armed chat.postMessage duplicate exactly like a stuck delete). Fresher running ops keep at-least-once crash semantics and re-arm as before. Unparseable UpdatedAt on a dead-process op is treated as stale rather than re-armed blind. Disabled when unconfigured. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> --------- Co-authored-by: kjgbot <kjgbot@agentrelay.dev> Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 232a25a commit eb59ca5

5 files changed

Lines changed: 531 additions & 0 deletions

File tree

internal/httpapi/server.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1823,6 +1823,12 @@ func (s *Server) handleDeleteFile(w http.ResponseWriter, r *http.Request, worksp
18231823
writeError(w, http.StatusNotFound, "not_found", err.Error(), correlationID)
18241824
case relayfile.ErrMissingPrecondition:
18251825
writeError(w, http.StatusPreconditionFailed, "precondition_failed", err.Error(), correlationID)
1826+
case relayfile.ErrDeleteStormRejected:
1827+
// #249: deliberate breaker trip, not a server fault — 429 tells
1828+
// well-behaved clients to back off and makes storms visible in
1829+
// access logs as a distinct class.
1830+
w.Header().Set("Retry-After", "60")
1831+
writeError(w, http.StatusTooManyRequests, "delete_storm_rejected", err.Error(), correlationID)
18261832
default:
18271833
writeError(w, http.StatusInternalServerError, "internal_error", err.Error(), correlationID)
18281834
}

internal/relayfile/delete_storm.go

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
package relayfile
2+
3+
import (
4+
"errors"
5+
"time"
6+
)
7+
8+
// Delete-storm breaker (issue #249, service-side layer).
9+
//
10+
// The 2026-06-06 incident: stale private mount state over a layout change made
11+
// a client push file_delete for every tracked record — the service admitted
12+
// ~186 canonical-record deletions from one mount correlation family within
13+
// minutes. The binary-side guard (#249 main body) refuses the false diff at
14+
// the client; this breaker is the layer that survives future client bugs: a
15+
// workspace's legitimate delete rate is near zero, so a burst is treated as
16+
// hostile until proven otherwise.
17+
//
18+
// Two protected surfaces, matching the incident's two arming paths:
19+
// - WRITE TIME: DeleteFile refuses admissions beyond the threshold within
20+
// the sliding window. The fs mutation itself is refused — the incident's
21+
// actual harm was the fs damage (every provider call failed and the
22+
// records were still destroyed).
23+
// - BOOT TIME: the construction-time scan re-enqueues every pending/running
24+
// op (a deploy is a restart). A rehydrated burst of pending file_delete
25+
// ops above the threshold is flipped to "quarantined" instead:
26+
// outside the boot allowlist, invisible to /writeback/pending, and not
27+
// replayable (ReplayOperation requires dead_lettered).
28+
//
29+
// Disabled unless DeleteStormThreshold > 0 — enabling it (and sizing the
30+
// threshold) is a deployment decision.
31+
32+
// ErrDeleteStormRejected is returned by DeleteFile when the workspace's
33+
// file_delete admission rate exceeds the configured threshold within the
34+
// configured window.
35+
var ErrDeleteStormRejected = errors.New("delete storm rejected: file_delete burst exceeds configured threshold")
36+
37+
const defaultDeleteStormWindow = time.Minute
38+
39+
// admitDeleteLocked records a file_delete admission attempt and reports
40+
// whether it may proceed. Caller must hold s.mu. Refused attempts are also
41+
// recorded so the breaker stays latched while a storm is ongoing; the window
42+
// must go quiet before deletes are admitted again.
43+
func (s *Store) admitDeleteLocked(workspaceID string, now time.Time) error {
44+
if s.deleteStormThreshold <= 0 {
45+
return nil
46+
}
47+
window := s.deleteStormWindow
48+
if window <= 0 {
49+
window = defaultDeleteStormWindow
50+
}
51+
cutoff := now.Add(-window)
52+
admissions := s.deleteStormAdmissions[workspaceID]
53+
kept := admissions[:0]
54+
for _, ts := range admissions {
55+
if ts.After(cutoff) {
56+
kept = append(kept, ts)
57+
}
58+
}
59+
if len(kept) >= s.deleteStormThreshold {
60+
s.deleteStormAdmissions[workspaceID] = append(kept, now)
61+
return ErrDeleteStormRejected
62+
}
63+
s.deleteStormAdmissions[workspaceID] = append(kept, now)
64+
return nil
65+
}
66+
67+
// quarantineBootDeleteStorms runs once at store construction, after state
68+
// load and before the boot re-enqueue scan. Workspaces holding more
69+
// pending/running file_delete ops than the threshold get that entire delete
70+
// set flipped to "quarantined": the boot allowlist then skips them, external
71+
// consumers never see them, and ReplayOperation refuses them. Releasing a
72+
// quarantined op is a deliberate operator action (ack it, or a future
73+
// explicit release API) — never automatic.
74+
// Two verb-asymmetric rules, matching the two armed classes the 2026-06-06
75+
// incident produced:
76+
//
77+
// - DELETE STORMS: a rehydrated pending/running file_delete burst above
78+
// DeleteStormThreshold is the storm signature at a different entry point.
79+
// - STALE RUNNING (op_20440 class, verb-agnostic): any op still "running"
80+
// at boot executed in a previous process life — its executor is dead.
81+
// Ones older than StaleRunningOpThreshold are abandoned and quarantined;
82+
// fresher ones keep the existing at-least-once crash semantics and
83+
// re-arm as before. A stuck-running upsert is an armed provider
84+
// duplicate exactly like a stuck delete.
85+
func (s *Store) quarantineBootDeleteStorms() {
86+
if s.deleteStormThreshold <= 0 && s.staleRunningOpThreshold <= 0 {
87+
return
88+
}
89+
s.mu.Lock()
90+
defer s.mu.Unlock()
91+
now := time.Now().UTC()
92+
changed := false
93+
for _, ws := range s.workspaces {
94+
quarantine := make([]string, 0)
95+
96+
if s.deleteStormThreshold > 0 {
97+
stormOps := make([]string, 0)
98+
for opID, op := range ws.Ops {
99+
if (op.Status == "pending" || op.Status == "running") && op.Action == string(WritebackActionFileDelete) {
100+
stormOps = append(stormOps, opID)
101+
}
102+
}
103+
if len(stormOps) > s.deleteStormThreshold {
104+
quarantine = append(quarantine, stormOps...)
105+
}
106+
}
107+
108+
if s.staleRunningOpThreshold > 0 {
109+
for opID, op := range ws.Ops {
110+
if op.Status != "running" {
111+
continue
112+
}
113+
updatedAt, err := time.Parse(time.RFC3339Nano, op.UpdatedAt)
114+
if err != nil {
115+
// Unparseable age on an op from a dead process life:
116+
// treat as stale rather than re-arm blind.
117+
quarantine = append(quarantine, opID)
118+
continue
119+
}
120+
if now.Sub(updatedAt) > s.staleRunningOpThreshold {
121+
quarantine = append(quarantine, opID)
122+
}
123+
}
124+
}
125+
126+
if len(quarantine) == 0 {
127+
continue
128+
}
129+
nowTS := nowRFC3339NanoUTC()
130+
for _, opID := range quarantine {
131+
op := ws.Ops[opID]
132+
if op.Status == "quarantined" {
133+
continue
134+
}
135+
op.Status = "quarantined"
136+
op.NextAttemptAt = nil
137+
op.UpdatedAt = nowTS
138+
ws.Ops[opID] = op
139+
}
140+
changed = true
141+
}
142+
if changed {
143+
_ = s.saveLocked()
144+
}
145+
}
Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
package relayfile
2+
3+
import (
4+
"fmt"
5+
"strings"
6+
"testing"
7+
"time"
8+
)
9+
10+
// Issue #249 (service-side layer): the 2026-06-06 delete storm destroyed 149+
11+
// canonical records when stale private mount state met a layout change — the
12+
// service admitted ~186 file_delete ops from one mount correlation family in
13+
// minutes without blinking. The binary-side guard refuses false diffs at the
14+
// client; this breaker is the layer that survives future client bugs.
15+
//
16+
// Contract (from the in-issue design, ratified in-run):
17+
// - Sliding-window burst detection at the write API (DeleteFile/BulkWrite)
18+
// BEFORE recordWriteLocked: a workspace exceeding the configured number
19+
// of file_delete admissions within the window gets ErrDeleteStormRejected
20+
// — the fs mutation itself is refused (the incident's real harm was the
21+
// fs damage; every provider call failed and records were still
22+
// destroyed).
23+
// - The breaker also covers BOOT RE-ENQUEUE (store.go boot scan re-arms
24+
// every pending/running op at every restart — including deploys): a
25+
// rehydrated burst of pending file_delete ops above the threshold is
26+
// quarantined (status "quarantined": outside the boot allowlist, not
27+
// replayable, not shown to external consumers) instead of re-enqueued.
28+
// - Disabled unless DeleteStormThreshold > 0 — defaults are a deploy
29+
// decision, not a library surprise.
30+
31+
func newStormStore(t *testing.T, threshold int, window time.Duration) *Store {
32+
t.Helper()
33+
store := NewStoreWithOptions(StoreOptions{
34+
ExternalWritebackMode: true,
35+
DeleteStormThreshold: threshold,
36+
DeleteStormWindow: window,
37+
})
38+
t.Cleanup(store.Close)
39+
return store
40+
}
41+
42+
func seedStormFiles(t *testing.T, store *Store, workspaceID string, count int) []string {
43+
t.Helper()
44+
paths := make([]string, 0, count)
45+
for i := 0; i < count; i++ {
46+
path := fmt.Sprintf("/slack/channels/C0STORM/messages/%d.json", i)
47+
if _, err := store.WriteFile(WriteRequest{
48+
WorkspaceID: workspaceID,
49+
Path: path,
50+
IfMatch: "0",
51+
ContentType: "application/json",
52+
Content: `{"text":"x"}`,
53+
CorrelationID: fmt.Sprintf("corr_seed_%d", i),
54+
}); err != nil {
55+
t.Fatalf("seed write %d failed: %v", i, err)
56+
}
57+
paths = append(paths, path)
58+
}
59+
return paths
60+
}
61+
62+
func deleteStormPath(store *Store, workspaceID, path string, n int) error {
63+
_, err := store.DeleteFile(DeleteRequest{
64+
WorkspaceID: workspaceID,
65+
Path: path,
66+
IfMatch: "*",
67+
CorrelationID: fmt.Sprintf("mount_1780738999_storm_%d", n),
68+
})
69+
return err
70+
}
71+
72+
func TestDeleteStormBreakerRefusesBurst(t *testing.T) {
73+
store := newStormStore(t, 5, time.Minute)
74+
paths := seedStormFiles(t, store, "ws_storm", 8)
75+
76+
for i := 0; i < 5; i++ {
77+
if err := deleteStormPath(store, "ws_storm", paths[i], i); err != nil {
78+
t.Fatalf("delete %d under threshold must succeed: %v", i, err)
79+
}
80+
}
81+
err := deleteStormPath(store, "ws_storm", paths[5], 5)
82+
if err != ErrDeleteStormRejected {
83+
t.Fatalf("delete above threshold must trip the breaker, got %v", err)
84+
}
85+
86+
// The refused delete must not have mutated the fs: the record survives.
87+
if _, err := store.ReadFile("ws_storm", paths[5]); err != nil {
88+
t.Fatalf("refused delete must leave the record intact: %v", err)
89+
}
90+
// And no operation/writeback may exist for it.
91+
feed, err := store.ListOperations("ws_storm", "", string(WritebackActionFileDelete), "", "", 1000)
92+
if err != nil {
93+
t.Fatalf("list operations failed: %v", err)
94+
}
95+
for _, op := range feed.Items {
96+
if op.Path == paths[5] {
97+
t.Fatalf("refused delete must not create an operation, got %+v", op)
98+
}
99+
}
100+
}
101+
102+
func TestDeleteStormBreakerHTTPErrorCode(t *testing.T) {
103+
// The sentinel must be distinguishable so httpapi can map it (429-class)
104+
// instead of a generic 500.
105+
if ErrDeleteStormRejected == nil {
106+
t.Fatalf("ErrDeleteStormRejected must be defined")
107+
}
108+
if !strings.Contains(ErrDeleteStormRejected.Error(), "delete storm") {
109+
t.Fatalf("sentinel should self-describe, got %q", ErrDeleteStormRejected.Error())
110+
}
111+
}
112+
113+
func TestDeleteStormBreakerDisabledWhenUnconfigured(t *testing.T) {
114+
store := NewStoreWithOptions(StoreOptions{ExternalWritebackMode: true})
115+
t.Cleanup(store.Close)
116+
paths := seedStormFiles(t, store, "ws_storm", 12)
117+
for i, path := range paths {
118+
if err := deleteStormPath(store, "ws_storm", path, i); err != nil {
119+
t.Fatalf("breaker must be inert when threshold unset; delete %d failed: %v", i, err)
120+
}
121+
}
122+
}
123+
124+
// Note for review: bulk deletes flow through fork commits (CommitFork →
125+
// recordWriteLocked with file.deleted), not BulkWrite. The storm came through
126+
// the direct DeleteFile path, which this breaker covers; whether fork-commit
127+
// deletes should share the window is an explicit open question in the PR.
128+
129+
func TestDeleteStormBreakerWindowExpires(t *testing.T) {
130+
store := newStormStore(t, 3, 50*time.Millisecond)
131+
paths := seedStormFiles(t, store, "ws_storm", 7)
132+
133+
for i := 0; i < 3; i++ {
134+
if err := deleteStormPath(store, "ws_storm", paths[i], i); err != nil {
135+
t.Fatalf("delete %d failed: %v", i, err)
136+
}
137+
}
138+
if err := deleteStormPath(store, "ws_storm", paths[3], 3); err != ErrDeleteStormRejected {
139+
t.Fatalf("4th delete inside window must trip, got %v", err)
140+
}
141+
time.Sleep(80 * time.Millisecond)
142+
if err := deleteStormPath(store, "ws_storm", paths[4], 4); err != nil {
143+
t.Fatalf("delete after window expiry must succeed: %v", err)
144+
}
145+
}
146+
147+
func TestBootRequeueQuarantinesPendingDeleteStorm(t *testing.T) {
148+
backend := NewInMemoryStateBackend()
149+
150+
// Phase 1: a store WITHOUT the breaker admits a storm of deletes whose
151+
// ops stay pending (external mode, never acked) — the incident shape
152+
// persisted to the state backend.
153+
first := NewStoreWithOptions(StoreOptions{
154+
ExternalWritebackMode: true,
155+
StateBackend: backend,
156+
})
157+
paths := seedStormFiles(t, first, "ws_storm", 10)
158+
for i, path := range paths {
159+
if err := deleteStormPath(first, "ws_storm", path, i); err != nil {
160+
t.Fatalf("storm delete %d failed: %v", i, err)
161+
}
162+
}
163+
first.Close()
164+
165+
// Phase 2: a new store (a deploy) boots over that state WITH the breaker
166+
// configured. The boot scan must quarantine the pending delete burst
167+
// instead of re-arming it into the queue.
168+
second := NewStoreWithOptions(StoreOptions{
169+
ExternalWritebackMode: true,
170+
StateBackend: backend,
171+
DeleteStormThreshold: 5,
172+
DeleteStormWindow: time.Minute,
173+
})
174+
t.Cleanup(second.Close)
175+
176+
// The seed upserts legitimately re-arm (that's existing boot behavior);
177+
// the breaker must remove exactly the DELETE burst from the armed set.
178+
armedDeletes, err := second.ListOperations("ws_storm", "pending", string(WritebackActionFileDelete), "", "", 1000)
179+
if err != nil {
180+
t.Fatalf("list pending deletes failed: %v", err)
181+
}
182+
if len(armedDeletes.Items) != 0 {
183+
t.Fatalf("boot must not re-arm a pending delete storm, got %d armed deletes", len(armedDeletes.Items))
184+
}
185+
feed, err := second.ListOperations("ws_storm", "quarantined", "", "", "", 1000)
186+
if err != nil {
187+
t.Fatalf("list quarantined failed: %v", err)
188+
}
189+
quarantinedDeletes := 0
190+
for _, op := range feed.Items {
191+
if op.Action == string(WritebackActionFileDelete) {
192+
quarantinedDeletes++
193+
}
194+
}
195+
if quarantinedDeletes != 10 {
196+
t.Fatalf("expected all 10 storm deletes quarantined at boot, got %d", quarantinedDeletes)
197+
}
198+
// And none of the quarantined deletes may sit in the external queue.
199+
for _, item := range second.GetPendingWritebacks("ws_storm") {
200+
for _, op := range feed.Items {
201+
if item["id"] == op.OpID {
202+
t.Fatalf("quarantined op %s leaked into the pending queue", op.OpID)
203+
}
204+
}
205+
}
206+
}
207+
208+
func TestQuarantinedOpsAreNotReplayable(t *testing.T) {
209+
backend := NewInMemoryStateBackend()
210+
first := NewStoreWithOptions(StoreOptions{ExternalWritebackMode: true, StateBackend: backend})
211+
paths := seedStormFiles(t, first, "ws_storm", 6)
212+
for i, path := range paths {
213+
if err := deleteStormPath(first, "ws_storm", path, i); err != nil {
214+
t.Fatalf("storm delete %d failed: %v", i, err)
215+
}
216+
}
217+
first.Close()
218+
219+
second := NewStoreWithOptions(StoreOptions{
220+
ExternalWritebackMode: true,
221+
StateBackend: backend,
222+
DeleteStormThreshold: 3,
223+
DeleteStormWindow: time.Minute,
224+
})
225+
t.Cleanup(second.Close)
226+
227+
feed, err := second.ListOperations("ws_storm", "quarantined", "", "", "", 10)
228+
if err != nil || len(feed.Items) == 0 {
229+
t.Fatalf("expected quarantined ops, err=%v items=%d", err, len(feed.Items))
230+
}
231+
if _, err := second.ReplayOperation("ws_storm", feed.Items[0].OpID, "corr_replay"); err != ErrInvalidState {
232+
t.Fatalf("quarantined ops must not be replayable, got %v", err)
233+
}
234+
}

0 commit comments

Comments
 (0)