Skip to content

Commit 6dd7bbc

Browse files
committed
fix(lib/httpapi): cap stored errors in EventEmitter to prevent unbounded growth
Previously EventEmitter.errors grew without bound, leaking memory on long-running servers and sending all historical errors to new SSE subscribers. Cap the slice to the most recent 100 errors (maxStoredErrors) so late subscribers still receive useful context without unbounded memory use.
1 parent 54c38eb commit 6dd7bbc

2 files changed

Lines changed: 71 additions & 1 deletion

File tree

lib/httpapi/events.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,9 @@ func convertStatus(status st.ConversationStatus) AgentStatus {
9494

9595
const defaultSubscriptionBufSize uint = 1024
9696

97+
// maxStoredErrors caps the number of errors retained for late subscribers.
98+
const maxStoredErrors = 100
99+
97100
type EventEmitterOption func(*EventEmitter)
98101

99102
func WithSubscriptionBufSize(size uint) EventEmitterOption {
@@ -224,8 +227,11 @@ func (e *EventEmitter) EmitError(message string, level st.ErrorLevel) {
224227
Time: e.clock.Now(),
225228
}
226229

227-
// Store the error so new subscribers can receive all errors
230+
// Store the error so new subscribers can receive recent errors.
228231
e.errors = append(e.errors, errorBody)
232+
if len(e.errors) > maxStoredErrors {
233+
e.errors = e.errors[len(e.errors)-maxStoredErrors:]
234+
}
229235

230236
e.notifyChannels(EventTypeError, errorBody)
231237
}

lib/httpapi/events_test.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,70 @@ func TestEventEmitter(t *testing.T) {
9999
}
100100
})
101101

102+
t.Run("error-cap", func(t *testing.T) {
103+
emitter := NewEventEmitter(WithSubscriptionBufSize(10))
104+
105+
for i := range 150 {
106+
emitter.EmitError(fmt.Sprintf("error %d", i), st.ErrorLevelError)
107+
}
108+
109+
_, _, stateEvents := emitter.Subscribe()
110+
111+
var errorEvents []Event
112+
for _, ev := range stateEvents {
113+
if ev.Type == EventTypeError {
114+
errorEvents = append(errorEvents, ev)
115+
}
116+
}
117+
118+
assert.Len(t, errorEvents, maxStoredErrors)
119+
120+
// Errors should be the last 100: "error 50" through "error 149".
121+
for i, ev := range errorEvents {
122+
body, ok := ev.Payload.(ErrorBody)
123+
assert.True(t, ok)
124+
assert.Equal(t, fmt.Sprintf("error %d", i+50), body.Message)
125+
}
126+
})
127+
128+
t.Run("error-events-in-initial-state", func(t *testing.T) {
129+
mockClock := quartz.NewMock(t)
130+
fixedTime := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC)
131+
mockClock.Set(fixedTime)
132+
133+
emitter := NewEventEmitter(WithClock(mockClock), WithSubscriptionBufSize(10))
134+
135+
emitter.EmitError("err1", st.ErrorLevelError)
136+
mockClock.Set(fixedTime.Add(1 * time.Second))
137+
emitter.EmitError("err2", st.ErrorLevelWarning)
138+
mockClock.Set(fixedTime.Add(2 * time.Second))
139+
emitter.EmitError("err3", st.ErrorLevelError)
140+
141+
_, _, stateEvents := emitter.Subscribe()
142+
143+
var errorEvents []Event
144+
for _, ev := range stateEvents {
145+
if ev.Type == EventTypeError {
146+
errorEvents = append(errorEvents, ev)
147+
}
148+
}
149+
150+
assert.Len(t, errorEvents, 3)
151+
152+
expected := []ErrorBody{
153+
{Message: "err1", Level: st.ErrorLevelError, Time: fixedTime},
154+
{Message: "err2", Level: st.ErrorLevelWarning, Time: fixedTime.Add(1 * time.Second)},
155+
{Message: "err3", Level: st.ErrorLevelError, Time: fixedTime.Add(2 * time.Second)},
156+
}
157+
for i, ev := range errorEvents {
158+
body, ok := ev.Payload.(ErrorBody)
159+
assert.True(t, ok)
160+
assert.Equal(t, expected[i].Message, body.Message)
161+
assert.Equal(t, expected[i].Level, body.Level)
162+
assert.Equal(t, expected[i].Time, body.Time)
163+
}
164+
})
165+
102166
t.Run("clock-injection", func(t *testing.T) {
103167
mockClock := quartz.NewMock(t)
104168
fixedTime := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC)

0 commit comments

Comments
 (0)